diff --git a/pkg/deploy/controller/config_change_controller.go b/pkg/deploy/controller/config_change_controller.go index 3e58dc539b24..cb6872c515cb 100644 --- a/pkg/deploy/controller/config_change_controller.go +++ b/pkg/deploy/controller/config_change_controller.go @@ -1,15 +1,17 @@ package controller import ( + "fmt" + + "github.com/golang/glog" + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - cache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" runtime "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" util "github.com/GoogleCloudPlatform/kubernetes/pkg/util" deployapi "github.com/openshift/origin/pkg/deploy/api" deployutil "github.com/openshift/origin/pkg/deploy/util" - - "github.com/golang/glog" ) // DeploymentConfigChangeController watches for changes to DeploymentConfigs and regenerates them only @@ -17,27 +19,23 @@ import ( type DeploymentConfigChangeController struct { ChangeStrategy ChangeStrategy NextDeploymentConfig func() *deployapi.DeploymentConfig - DeploymentStore cache.Store Codec runtime.Codec // Stop is an optional channel that controls when the controller exits Stop <-chan struct{} } -// ChangeStrategy knows how to generate and update DeploymentConfigs. -type ChangeStrategy interface { - GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) - UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) -} - // Run watches for config change events. func (dc *DeploymentConfigChangeController) Run() { - go util.Until(func() { dc.HandleDeploymentConfig() }, 0, dc.Stop) + go util.Until(func() { + err := dc.HandleDeploymentConfig(dc.NextDeploymentConfig()) + if err != nil { + glog.Errorf("%v", err) + } + }, 0, dc.Stop) } // HandleDeploymentConfig handles the next DeploymentConfig change that happens. -func (dc *DeploymentConfigChangeController) HandleDeploymentConfig() { - config := dc.NextDeploymentConfig() - +func (dc *DeploymentConfigChangeController) HandleDeploymentConfig(config *deployapi.DeploymentConfig) error { hasChangeTrigger := false for _, trigger := range config.Triggers { if trigger.Type == deployapi.DeploymentTriggerOnConfigChange { @@ -47,59 +45,57 @@ func (dc *DeploymentConfigChangeController) HandleDeploymentConfig() { } if !hasChangeTrigger { - glog.V(4).Infof("Config has no change trigger; skipping") - return + glog.V(4).Infof("Ignoring config %s; no change triggers detected", labelFor(config)) + return nil } if config.LatestVersion == 0 { - glog.V(4).Infof("Creating new deployment for config %v", config.Name) - dc.generateDeployment(config, nil) - return + _, _, err := dc.generateDeployment(config) + if err != nil { + return fmt.Errorf("couldn't create initial deployment for config %s: %v", labelFor(config), err) + } + glog.V(4).Infof("Created initial deployment for config %s", labelFor(config)) + return nil } - latestDeploymentID := deployutil.LatestDeploymentNameForConfig(config) - obj, exists, err := dc.DeploymentStore.Get(&kapi.ReplicationController{ObjectMeta: kapi.ObjectMeta{Name: latestDeploymentID, Namespace: config.Namespace}}) + latestDeploymentName := deployutil.LatestDeploymentNameForConfig(config) + deployment, err := dc.ChangeStrategy.GetDeployment(config.Namespace, latestDeploymentName) if err != nil { - glog.Errorf("Unable to retrieve deployment from store: %v", err) - } - - if !exists { - glog.V(4).Info("Ignoring config change due to lack of existing deployment") - return + if kerrors.IsNotFound(err) { + glog.V(4).Info("Ignoring config change for %s; no existing deployment found", labelFor(config)) + return nil + } + return fmt.Errorf("couldn't retrieve deployment for %s: %v", labelFor(config), err) } - deployment := obj.(*kapi.ReplicationController) - deployedConfig, err := deployutil.DecodeDeploymentConfig(deployment, dc.Codec) if err != nil { - glog.V(0).Infof("Error decoding deploymentConfig from deployment %s: %v", deployment.Name, err) - return + return fmt.Errorf("error decoding deploymentConfig from deployment %s for config %s: %v", labelForDeployment(deployment), labelFor(config), err) } if deployutil.PodSpecsEqual(config.Template.ControllerTemplate.Template.Spec, deployedConfig.Template.ControllerTemplate.Template.Spec) { - glog.V(4).Infof("Ignoring updated config %s with LatestVersion=%d because it matches deployed config %s", config.Name, config.LatestVersion, deployment.Name) - return + glog.V(4).Infof("Ignoring config change for %s (latestVersion=%d); same as deployment %s", labelFor(config), config.LatestVersion, labelForDeployment(deployment)) + return nil } - glog.V(4).Infof("Diff:\n%s", util.ObjectDiff(config.Template.ControllerTemplate.Template.Spec, deployedConfig.Template.ControllerTemplate.Template.Spec)) - dc.generateDeployment(config, deployment) + fromVersion, toVersion, err := dc.generateDeployment(config) + if err != nil { + return fmt.Errorf("couldn't generate deployment for config %s: %v", labelFor(config), err) + } + glog.V(4).Infof("Updated config %s from version %d to %d for existing deployment %s", labelFor(config), fromVersion, toVersion, labelForDeployment(deployment)) + return nil } -func (dc *DeploymentConfigChangeController) generateDeployment(config *deployapi.DeploymentConfig, deployment *kapi.ReplicationController) { +func (dc *DeploymentConfigChangeController) generateDeployment(config *deployapi.DeploymentConfig) (int, int, error) { newConfig, err := dc.ChangeStrategy.GenerateDeploymentConfig(config.Namespace, config.Name) if err != nil { - glog.V(2).Infof("Error generating new version of deploymentConfig %v: %#v", config.Name, err) - return + return config.LatestVersion, 0, fmt.Errorf("Error generating new version of deploymentConfig %s: %v", labelFor(config), err) } if newConfig.LatestVersion == config.LatestVersion { newConfig.LatestVersion++ } - if deployment != nil { - glog.V(4).Infof("Updating config %s (LatestVersion: %d -> %d) to advance existing deployment %s", config.Name, config.LatestVersion, newConfig.LatestVersion, deployment.Name) - } - // set the trigger details for the new deployment config causes := []*deployapi.DeploymentCause{} causes = append(causes, @@ -114,6 +110,34 @@ func (dc *DeploymentConfigChangeController) generateDeployment(config *deployapi // okay - we can just ignore the update for the old resource and any changes to the more // current config will be captured in future events. if _, err = dc.ChangeStrategy.UpdateDeploymentConfig(config.Namespace, newConfig); err != nil { - glog.V(2).Infof("Error updating deploymentConfig %v: %#v", config.Name, err) + return config.LatestVersion, newConfig.LatestVersion, fmt.Errorf("couldn't update deploymentConfig %s: %v", labelFor(config), err) } + + return config.LatestVersion, newConfig.LatestVersion, nil +} + +// ChangeStrategy knows how to generate and update DeploymentConfigs. +type ChangeStrategy interface { + GetDeployment(namespace, name string) (*kapi.ReplicationController, error) + GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) + UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) +} + +// ChangeStrategyImpl is a pluggable ChangeStrategy. +type ChangeStrategyImpl struct { + GetDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) + GenerateDeploymentConfigFunc func(namespace, name string) (*deployapi.DeploymentConfig, error) + UpdateDeploymentConfigFunc func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) +} + +func (i *ChangeStrategyImpl) GetDeployment(namespace, name string) (*kapi.ReplicationController, error) { + return i.GetDeploymentFunc(namespace, name) +} + +func (i *ChangeStrategyImpl) GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) { + return i.GenerateDeploymentConfigFunc(namespace, name) +} + +func (i *ChangeStrategyImpl) UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + return i.UpdateDeploymentConfigFunc(namespace, config) } diff --git a/pkg/deploy/controller/config_change_controller_test.go b/pkg/deploy/controller/config_change_controller_test.go index 080938e2b77d..ca9edd4582bd 100644 --- a/pkg/deploy/controller/config_change_controller_test.go +++ b/pkg/deploy/controller/config_change_controller_test.go @@ -8,43 +8,31 @@ import ( api "github.com/openshift/origin/pkg/api/latest" deployapi "github.com/openshift/origin/pkg/deploy/api" deployapitest "github.com/openshift/origin/pkg/deploy/api/test" - deploytest "github.com/openshift/origin/pkg/deploy/controller/test" deployutil "github.com/openshift/origin/pkg/deploy/util" ) // Test the controller's response to a new DeploymentConfig with a config change trigger. func TestNewConfigWithoutTrigger(t *testing.T) { - generated := false - updated := false - controller := &DeploymentConfigChangeController{ Codec: api.Codec, - ChangeStrategy: &testChangeStrategy{ + ChangeStrategy: &ChangeStrategyImpl{ GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { - generated = true + t.Fatalf("unexpected generation of deploymentConfig") return nil, nil }, UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { - updated = true + t.Fatalf("unexpected update of deploymentConfig") return config, nil }, }, - NextDeploymentConfig: func() *deployapi.DeploymentConfig { - config := deployapitest.OkDeploymentConfig(1) - config.Triggers = []deployapi.DeploymentTriggerPolicy{} - return config - }, - DeploymentStore: deploytest.NewFakeDeploymentStore(nil), } - controller.HandleDeploymentConfig() - - if generated { - t.Error("Unexpected generation of deploymentConfig") - } + config := deployapitest.OkDeploymentConfig(1) + config.Triggers = []deployapi.DeploymentTriggerPolicy{} + err := controller.HandleDeploymentConfig(config) - if updated { - t.Error("Unexpected update of deploymentConfig") + if err != nil { + t.Fatalf("unexpected error: %v", err) } } @@ -53,7 +41,7 @@ func TestNewConfigWithTrigger(t *testing.T) { controller := &DeploymentConfigChangeController{ Codec: api.Codec, - ChangeStrategy: &testChangeStrategy{ + ChangeStrategy: &ChangeStrategyImpl{ GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { return deployapitest.OkDeploymentConfig(1), nil }, @@ -62,15 +50,15 @@ func TestNewConfigWithTrigger(t *testing.T) { return config, nil }, }, - NextDeploymentConfig: func() *deployapi.DeploymentConfig { - config := deployapitest.OkDeploymentConfig(0) - config.Triggers = []deployapi.DeploymentTriggerPolicy{deployapitest.OkConfigChangeTrigger()} - return config - }, - DeploymentStore: deploytest.NewFakeDeploymentStore(nil), } - controller.HandleDeploymentConfig() + config := deployapitest.OkDeploymentConfig(0) + config.Triggers = []deployapi.DeploymentTriggerPolicy{deployapitest.OkConfigChangeTrigger()} + err := controller.HandleDeploymentConfig(config) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if updated == nil { t.Fatalf("expected config to be updated") @@ -92,11 +80,10 @@ func TestNewConfigWithTrigger(t *testing.T) { // Test the controller's response when the pod template is changed func TestChangeWithTemplateDiff(t *testing.T) { var updated *deployapi.DeploymentConfig - deployment, _ := deployutil.MakeDeployment(deployapitest.OkDeploymentConfig(1), kapi.Codec) controller := &DeploymentConfigChangeController{ Codec: api.Codec, - ChangeStrategy: &testChangeStrategy{ + ChangeStrategy: &ChangeStrategyImpl{ GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { return deployapitest.OkDeploymentConfig(2), nil }, @@ -104,17 +91,21 @@ func TestChangeWithTemplateDiff(t *testing.T) { updated = config return config, nil }, + GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + deployment, _ := deployutil.MakeDeployment(deployapitest.OkDeploymentConfig(1), kapi.Codec) + return deployment, nil + }, }, - NextDeploymentConfig: func() *deployapi.DeploymentConfig { - config := deployapitest.OkDeploymentConfig(1) - config.Triggers = []deployapi.DeploymentTriggerPolicy{deployapitest.OkConfigChangeTrigger()} - config.Template.ControllerTemplate.Template.Spec.Containers[1].Name = "modified" - return config - }, - DeploymentStore: deploytest.NewFakeDeploymentStore(deployment), } - controller.HandleDeploymentConfig() + config := deployapitest.OkDeploymentConfig(1) + config.Triggers = []deployapi.DeploymentTriggerPolicy{deployapitest.OkConfigChangeTrigger()} + config.Template.ControllerTemplate.Template.Spec.Containers[1].Name = "modified" + err := controller.HandleDeploymentConfig(config) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if updated == nil { t.Fatalf("expected config to be updated") @@ -137,14 +128,12 @@ func TestChangeWithoutTemplateDiff(t *testing.T) { config := deployapitest.OkDeploymentConfig(1) config.Triggers = []deployapi.DeploymentTriggerPolicy{deployapitest.OkConfigChangeTrigger()} - deployment, _ := deployutil.MakeDeployment(deployapitest.OkDeploymentConfig(1), kapi.Codec) - generated := false updated := false controller := &DeploymentConfigChangeController{ Codec: api.Codec, - ChangeStrategy: &testChangeStrategy{ + ChangeStrategy: &ChangeStrategyImpl{ GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { generated = true return config, nil @@ -153,14 +142,18 @@ func TestChangeWithoutTemplateDiff(t *testing.T) { updated = true return config, nil }, + GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + deployment, _ := deployutil.MakeDeployment(deployapitest.OkDeploymentConfig(1), kapi.Codec) + return deployment, nil + }, }, - NextDeploymentConfig: func() *deployapi.DeploymentConfig { - return config - }, - DeploymentStore: deploytest.NewFakeDeploymentStore(deployment), } - controller.HandleDeploymentConfig() + err := controller.HandleDeploymentConfig(config) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if generated { t.Error("Unexpected generation of deploymentConfig") @@ -170,16 +163,3 @@ func TestChangeWithoutTemplateDiff(t *testing.T) { t.Error("Unexpected update of deploymentConfig") } } - -type testChangeStrategy struct { - GenerateDeploymentConfigFunc func(namespace, name string) (*deployapi.DeploymentConfig, error) - UpdateDeploymentConfigFunc func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) -} - -func (i *testChangeStrategy) GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) { - return i.GenerateDeploymentConfigFunc(namespace, name) -} - -func (i *testChangeStrategy) UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { - return i.UpdateDeploymentConfigFunc(namespace, config) -} diff --git a/pkg/deploy/controller/deployment_config_controller.go b/pkg/deploy/controller/deployment_config_controller.go index fbbab001edc7..7128a38792fc 100644 --- a/pkg/deploy/controller/deployment_config_controller.go +++ b/pkg/deploy/controller/deployment_config_controller.go @@ -3,11 +3,12 @@ package controller import ( "fmt" + "github.com/golang/glog" + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/golang/glog" deployapi "github.com/openshift/origin/pkg/deploy/api" deployutil "github.com/openshift/origin/pkg/deploy/util" @@ -20,8 +21,8 @@ import ( // Deployments are represented by ReplicationControllers. The DeploymentConfig used to create the // ReplicationController is encoded and stored in an annotation on the ReplicationController. type DeploymentConfigController struct { - // DeploymentInterface provides access to Deployments. - DeploymentInterface dccDeploymentInterface + // DeploymentClient provides access to Deployments. + DeploymentClient DeploymentConfigControllerDeploymentClient // NextDeploymentConfig blocks until the next DeploymentConfig is available. NextDeploymentConfig func() *deployapi.DeploymentConfig // Codec is used to encode DeploymentConfigs which are stored on deployments. @@ -30,63 +31,84 @@ type DeploymentConfigController struct { Stop <-chan struct{} } -// dccDeploymentInterface is a small private interface for dealing with Deployments. -type dccDeploymentInterface interface { - GetDeployment(namespace, name string) (*kapi.ReplicationController, error) - CreateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) -} - -// Process DeploymentConfig events one at a time. +// Run processes DeploymentConfigs one at a time until the Stop channel unblocks. func (c *DeploymentConfigController) Run() { - go util.Until(c.HandleDeploymentConfig, 0, c.Stop) + go util.Until(func() { + err := c.HandleDeploymentConfig(c.NextDeploymentConfig()) + if err != nil { + glog.Errorf("%v", err) + } + }, 0, c.Stop) } -// Process a single DeploymentConfig event. -func (c *DeploymentConfigController) HandleDeploymentConfig() { - config := c.NextDeploymentConfig() - deploy, err := c.shouldDeploy(config) - if err != nil { - util.HandleError(fmt.Errorf("unable to decide whether to redeploy %s: %v", labelFor(config), err)) - return +// HandleDeploymentConfig examines the current state of a DeploymentConfig, and creates a new +// deployment for the config if the following conditions are true: +// +// 1. The config version is greater than 0 +// 2. No deployment exists corresponding to the config's version +// +// If the config can't be processed, an error is returned. +func (c *DeploymentConfigController) HandleDeploymentConfig(config *deployapi.DeploymentConfig) error { + // Only deploy when the version has advanced past 0. + if config.LatestVersion == 0 { + glog.V(5).Infof("Waiting for first version of %s", labelFor(config)) + return nil } - if !deploy { - return + + // Find any existing deployment, and return if one already exists. + if deployment, err := c.DeploymentClient.GetDeployment(config.Namespace, deployutil.LatestDeploymentNameForConfig(config)); err != nil { + if !errors.IsNotFound(err) { + return fmt.Errorf("error looking up existing deployment for config %s: %v", labelFor(config), err) + } + } else { + // If there's an existing deployment, nothing needs to be done. + if deployment != nil { + return nil + } } + // Try and build a deployment for the config. deployment, err := deployutil.MakeDeployment(config, c.Codec) if err != nil { - util.HandleError(fmt.Errorf("unable to create deployment for %s: %v", labelFor(config), err)) - return + return fmt.Errorf("couldn't make deployment from (potentially invalid) config %s: %v", labelFor(config), err) } - glog.V(4).Infof("Deploying %s", labelFor(config)) - if _, deployErr := c.DeploymentInterface.CreateDeployment(config.Namespace, deployment); deployErr != nil { - util.HandleError(fmt.Errorf("unable to create deployment %s: %v", labelFor(config), err)) - return + // Create the deployment. + if _, err := c.DeploymentClient.CreateDeployment(config.Namespace, deployment); err == nil { + glog.V(4).Infof("Created deployment for config %s", labelFor(config)) + return nil + } else { + // If the deployment was already created, just move on. The cache could be stale, or another + // process could have already handled this update. + if errors.IsAlreadyExists(err) { + glog.V(4).Infof("Deployment already exists for config %s", labelFor(config)) + return nil + } + return fmt.Errorf("couldn't create deployment for config %s: %v", labelFor(config), err) } } -// shouldDeploy returns true if the DeploymentConfig should have a new Deployment created. -func (c *DeploymentConfigController) shouldDeploy(config *deployapi.DeploymentConfig) (bool, error) { - if config.LatestVersion == 0 { - glog.V(5).Infof("Waiting for first version of %s", labelFor(config)) - return false, nil - } +// labelFor builds a string identifier for a DeploymentConfig. +func labelFor(config *deployapi.DeploymentConfig) string { + return fmt.Sprintf("%s/%s:%d", config.Namespace, config.Name, config.LatestVersion) +} - latestDeploymentID := deployutil.LatestDeploymentNameForConfig(config) - deployment, err := c.DeploymentInterface.GetDeployment(config.Namespace, latestDeploymentID) +// DeploymentConfigControllerDeploymentClient abstracts access to deployments. +type DeploymentConfigControllerDeploymentClient interface { + GetDeployment(namespace, name string) (*kapi.ReplicationController, error) + CreateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} - if err != nil { - if errors.IsNotFound(err) { - return true, nil - } - return false, err - } +// DeploymentConfigControllerDeploymentClientImpl is a pluggable deploymentConfigControllerDeploymentClient. +type DeploymentConfigControllerDeploymentClientImpl struct { + GetDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) + CreateDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} - glog.V(5).Infof("Found deployment for %s - %s:%s", labelFor(config), deployment.UID, deployment.ResourceVersion) - return false, nil +func (i *DeploymentConfigControllerDeploymentClientImpl) GetDeployment(namespace, name string) (*kapi.ReplicationController, error) { + return i.GetDeploymentFunc(namespace, name) } -func labelFor(config *deployapi.DeploymentConfig) string { - return fmt.Sprintf("%s/%s:%d", config.Namespace, config.Name, config.LatestVersion) +func (i *DeploymentConfigControllerDeploymentClientImpl) CreateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return i.CreateDeploymentFunc(namespace, deployment) } diff --git a/pkg/deploy/controller/deployment_config_controller_test.go b/pkg/deploy/controller/deployment_config_controller_test.go index 45b71e6bc6e1..819cb416d157 100644 --- a/pkg/deploy/controller/deployment_config_controller_test.go +++ b/pkg/deploy/controller/deployment_config_controller_test.go @@ -1,13 +1,13 @@ package controller import ( + "fmt" "testing" kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" api "github.com/openshift/origin/pkg/api/latest" - deployapi "github.com/openshift/origin/pkg/deploy/api" deploytest "github.com/openshift/origin/pkg/deploy/api/test" deployutil "github.com/openshift/origin/pkg/deploy/util" ) @@ -15,7 +15,7 @@ import ( func TestHandleNewDeploymentConfig(t *testing.T) { controller := &DeploymentConfigController{ Codec: api.Codec, - DeploymentInterface: &testDeploymentInterface{ + DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{ GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { t.Fatalf("unexpected call with name %s", name) return nil, nil @@ -25,47 +25,70 @@ func TestHandleNewDeploymentConfig(t *testing.T) { return nil, nil }, }, - NextDeploymentConfig: func() *deployapi.DeploymentConfig { - return deploytest.OkDeploymentConfig(0) - }, } - controller.HandleDeploymentConfig() + err := controller.HandleDeploymentConfig(deploytest.OkDeploymentConfig(0)) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } } -func TestHandleInitialDeployment(t *testing.T) { +func TestHandleUpdatedDeploymentConfigOk(t *testing.T) { deploymentConfig := deploytest.OkDeploymentConfig(1) var deployed *kapi.ReplicationController controller := &DeploymentConfigController{ Codec: api.Codec, - DeploymentInterface: &testDeploymentInterface{ + DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{ GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - return nil, kerrors.NewNotFound("replicationController", name) + return nil, kerrors.NewNotFound("ReplicationController", name) }, CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { deployed = deployment return deployment, nil }, }, - NextDeploymentConfig: func() *deployapi.DeploymentConfig { - return deploymentConfig - }, } - controller.HandleDeploymentConfig() + err := controller.HandleDeploymentConfig(deploymentConfig) if deployed == nil { t.Fatalf("expected a deployment") } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } } -func TestHandleConfigChangeLatestAlreadyDeployed(t *testing.T) { +func TestHandleUpdatedDeploymentConfigLookupFailure(t *testing.T) { + controller := &DeploymentConfigController{ + Codec: api.Codec, + DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{ + GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return nil, kerrors.NewInternalError(fmt.Errorf("test error")) + }, + CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected call with deployment %v", deployment) + return nil, nil + }, + }, + } + + err := controller.HandleDeploymentConfig(deploytest.OkDeploymentConfig(1)) + + if err == nil { + t.Fatalf("expected error") + } +} + +func TestHandleUpdatedDeploymentConfigAlreadyDeployed(t *testing.T) { deploymentConfig := deploytest.OkDeploymentConfig(0) controller := &DeploymentConfigController{ Codec: api.Codec, - DeploymentInterface: &testDeploymentInterface{ + DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{ GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { deployment, _ := deployutil.MakeDeployment(deploymentConfig, kapi.Codec) return deployment, nil @@ -75,23 +98,31 @@ func TestHandleConfigChangeLatestAlreadyDeployed(t *testing.T) { return nil, nil }, }, - NextDeploymentConfig: func() *deployapi.DeploymentConfig { - return deploymentConfig - }, } - controller.HandleDeploymentConfig() -} + err := controller.HandleDeploymentConfig(deploymentConfig) -type testDeploymentInterface struct { - GetDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) - CreateDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } } -func (i *testDeploymentInterface) GetDeployment(namespace, name string) (*kapi.ReplicationController, error) { - return i.GetDeploymentFunc(namespace, name) -} +func TestHandleUpdatedDeploymentConfigError(t *testing.T) { + controller := &DeploymentConfigController{ + Codec: api.Codec, + DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{ + GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return nil, kerrors.NewNotFound("ReplicationController", name) + }, + CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return nil, kerrors.NewInternalError(fmt.Errorf("test error")) + }, + }, + } -func (i *testDeploymentInterface) CreateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - return i.CreateDeploymentFunc(namespace, deployment) + err := controller.HandleDeploymentConfig(deploytest.OkDeploymentConfig(1)) + + if err == nil { + t.Fatalf("expected error") + } } diff --git a/pkg/deploy/controller/deployment_controller.go b/pkg/deploy/controller/deployment_controller.go index f04604cbda53..b08280efdbd5 100644 --- a/pkg/deploy/controller/deployment_controller.go +++ b/pkg/deploy/controller/deployment_controller.go @@ -7,7 +7,6 @@ import ( kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -22,16 +21,14 @@ import ( type DeploymentController struct { // ContainerCreator makes the container for the deployment pod based on the strategy. ContainerCreator DeploymentContainerCreator - // DeploymentInterface provides access to deployments. - DeploymentInterface dcDeploymentInterface - // PodInterface provides access to pods. - PodInterface dcPodInterface + // DeploymentClient provides access to deployments. + DeploymentClient DeploymentControllerDeploymentClient + // PodClient provides access to pods. + PodClient DeploymentControllerPodClient // NextDeployment blocks until the next deployment is available. NextDeployment func() *kapi.ReplicationController // NextPod blocks until the next pod is available. NextPod func() *kapi.Pod - // DeploymentStore is a cache of deployments. - DeploymentStore cache.Store // Environment is a set of environment which should be injected into all deployment pod // containers, in addition to whatever environment is specified by the ContainerCreator. Environment []kapi.EnvVar @@ -43,130 +40,124 @@ type DeploymentController struct { Stop <-chan struct{} } -// DeploymentContainerCreator knows how to create a deployment pod's container based on -// the deployment's strategy. -type DeploymentContainerCreator interface { - CreateContainer(*deployapi.DeploymentStrategy) *kapi.Container -} - -type dcDeploymentInterface interface { - UpdateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) -} - -type dcPodInterface interface { - CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) - DeletePod(namespace, id string) error -} - // Run begins watching and synchronizing deployment states. func (dc *DeploymentController) Run() { - go util.Until(func() { dc.HandleDeployment() }, 0, dc.Stop) - go util.Until(func() { dc.HandlePod() }, 0, dc.Stop) + go util.Until(func() { + err := dc.HandleDeployment(dc.NextDeployment()) + if err != nil { + glog.Errorf("%v", err) + } + }, 0, dc.Stop) + + go util.Until(func() { + err := dc.HandlePod(dc.NextPod()) + if err != nil { + glog.Errorf("%v", err) + } + }, 0, dc.Stop) } // HandleDeployment processes a new deployment and creates a new Pod which implements the specific // deployment behavior. The deployment and pod are correlated with annotations. If the pod was -// successfully created, the deployment's status is transitioned to pending; otherwise, the status -// is transitioned to failed. -func (dc *DeploymentController) HandleDeployment() { - deployment := dc.NextDeployment() - - if deployment.Annotations[deployapi.DeploymentStatusAnnotation] != string(deployapi.DeploymentStatusNew) { - glog.V(4).Infof("Ignoring deployment %s with non-New status", deployment.Name) - return - } +// successfully created, the deployment's status is transitioned to pending. +func (dc *DeploymentController) HandleDeployment(deployment *kapi.ReplicationController) error { + currentStatus := statusFor(deployment) + nextStatus := currentStatus - // TODO: transition to a failed state? seems like yes since this is probably not recoverable - var deploymentPod *kapi.Pod - var deploymentPodError error - if deploymentPod, deploymentPodError = dc.makeDeploymentPod(deployment); deploymentPodError != nil { - glog.V(0).Infof("Failed to make deployment pod for %s: %v", deployment.Name, deploymentPodError) - return - } + switch currentStatus { + case deployapi.DeploymentStatusNew: + deploymentPod, makeDeployerPodErr := dc.makeDeployerPod(deployment) + if makeDeployerPodErr != nil { + return fmt.Errorf("couldn't make deployer pod for %s: %v", labelForDeployment(deployment), makeDeployerPodErr) + } - nextStatus := deployment.Annotations[deployapi.DeploymentStatusAnnotation] - if pod, err := dc.PodInterface.CreatePod(deployment.Namespace, deploymentPod); err != nil { - // If the pod already exists, it's possible that a previous CreatePod succeeded but - // the deployment state update failed and now we're re-entering. - if kerrors.IsAlreadyExists(err) { - nextStatus = string(deployapi.DeploymentStatusPending) + if _, err := dc.PodClient.CreatePod(deployment.Namespace, deploymentPod); err != nil { + // If the pod already exists, it's possible that a previous CreatePod succeeded but + // the deployment state update failed and now we're re-entering. + if !kerrors.IsAlreadyExists(err) { + return fmt.Errorf("couldn't create deployer pod for %s: %v", labelForDeployment(deployment), err) + } } else { - glog.Infof("Error creating pod for deployment %s: %v", deployment.Name, err) - nextStatus = string(deployapi.DeploymentStatusFailed) + glog.V(2).Infof("Created pod %s for deployment %s", deploymentPod.Name, labelForDeployment(deployment)) } - } else { - glog.V(2).Infof("Created pod %s for deployment %s", pod.Name, deployment.Name) - deployment.Annotations[deployapi.DeploymentPodAnnotation] = pod.Name - nextStatus = string(deployapi.DeploymentStatusPending) - } - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = nextStatus + deployment.Annotations[deployapi.DeploymentPodAnnotation] = deploymentPod.Name + nextStatus = deployapi.DeploymentStatusPending + case deployapi.DeploymentStatusPending, + deployapi.DeploymentStatusRunning, + deployapi.DeploymentStatusFailed: + glog.V(4).Infof("Ignoring deployment %s (status %s)", labelForDeployment(deployment), currentStatus) + case deployapi.DeploymentStatusComplete: + // Automatically clean up successful pods + // TODO: Could probably do a lookup here to skip the delete call, but it's not worth adding + // yet since (delete retries will only normally occur during full a re-sync). + podName := deployment.Annotations[deployapi.DeploymentPodAnnotation] + if err := dc.PodClient.DeletePod(deployment.Namespace, podName); err != nil { + if !kerrors.IsNotFound(err) { + return fmt.Errorf("couldn't delete completed deployer pod %s/%s for deployment %s: %v", deployment.Namespace, podName, labelForDeployment(deployment), err) + } + // Already deleted + } else { + glog.V(4).Infof("Deleted completed deployer pod %s/%s for deployment %s", deployment.Namespace, podName, labelForDeployment(deployment)) + } + } - glog.V(2).Infof("Updating deployment %s status %s -> %s", deployment.Name, deployment.Status, nextStatus) - if _, err := dc.DeploymentInterface.UpdateDeployment(deployment.Namespace, deployment); err != nil { - glog.V(2).Infof("Failed to update deployment %s: %v", deployment.Name, err) + if currentStatus != nextStatus { + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(nextStatus) + if _, err := dc.DeploymentClient.UpdateDeployment(deployment.Namespace, deployment); err != nil { + return fmt.Errorf("Couldn't update deployment %s to status %s: %v", labelForDeployment(deployment), nextStatus, err) + } + glog.V(2).Infof("Updated deployment %s status from %s to %s", labelForDeployment(deployment), currentStatus, nextStatus) } + + return nil } // HandlePod reconciles a pod's current state with its associated deployment and updates the // deployment appropriately. -func (dc *DeploymentController) HandlePod() { - pod := dc.NextPod() - +func (dc *DeploymentController) HandlePod(pod *kapi.Pod) error { // Verify the assumption that we'll be given only pods correlated to a deployment - deploymentID, hasDeploymentID := pod.Annotations[deployapi.DeploymentAnnotation] - if !hasDeploymentID { - glog.V(2).Infof("Unexpected state: Pod %s has no deployment annotation; skipping", pod.Name) - return + deploymentName, hasDeploymentName := pod.Annotations[deployapi.DeploymentAnnotation] + if !hasDeploymentName { + glog.V(2).Infof("Ignoring pod %s; no deployment annotation found", pod.Name) + return nil } - deploymentObj, deploymentExists, err := dc.DeploymentStore.Get(&kapi.ReplicationController{ObjectMeta: kapi.ObjectMeta{Name: deploymentID, Namespace: pod.Namespace}}) - if err != nil { - glog.Errorf("Unable to retrieve deployment from store: %v", err) - return - } - if !deploymentExists { - glog.V(2).Infof("Couldn't find deployment %s associated with pod %s", deploymentID, pod.Name) - return + deployment, deploymentErr := dc.DeploymentClient.GetDeployment(pod.Namespace, deploymentName) + if deploymentErr != nil { + return fmt.Errorf("couldn't get deployment %s/%s associated with pod %s", pod.Namespace, deploymentName, pod.Name) } - deployment := deploymentObj.(*kapi.ReplicationController) - nextDeploymentStatus := deployment.Annotations[deployapi.DeploymentStatusAnnotation] + currentStatus := statusFor(deployment) + nextStatus := currentStatus switch pod.Status.Phase { case kapi.PodRunning: - nextDeploymentStatus = string(deployapi.DeploymentStatusRunning) + nextStatus = deployapi.DeploymentStatusRunning case kapi.PodSucceeded, kapi.PodFailed: - nextDeploymentStatus = string(deployapi.DeploymentStatusComplete) + nextStatus = deployapi.DeploymentStatusComplete // Detect failure based on the container state for _, info := range pod.Status.Info { if info.State.Termination != nil && info.State.Termination.ExitCode != 0 { - nextDeploymentStatus = string(deployapi.DeploymentStatusFailed) - } - } - - // Automatically clean up successful pods - if nextDeploymentStatus == string(deployapi.DeploymentStatusComplete) { - if err := dc.PodInterface.DeletePod(deployment.Namespace, pod.Name); err != nil { - glog.V(4).Infof("Couldn't delete completed pod %s for deployment %s: %#v", pod.Name, deployment.Name, err) - } else { - glog.V(4).Infof("Deleted completed pod %s for deployment %s", pod.Name, deployment.Name) + nextStatus = deployapi.DeploymentStatusFailed } } } - if deployment.Annotations[deployapi.DeploymentStatusAnnotation] != nextDeploymentStatus { - glog.V(2).Infof("Updating deployment %s status %s -> %s", deployment.Name, deployment.Annotations[deployapi.DeploymentStatusAnnotation], nextDeploymentStatus) - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = nextDeploymentStatus - if _, err := dc.DeploymentInterface.UpdateDeployment(pod.Namespace, deployment); err != nil { - glog.V(2).Infof("Failed to update deployment %v: %v", deployment.Name, err) + if currentStatus != nextStatus { + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(nextStatus) + if _, err := dc.DeploymentClient.UpdateDeployment(deployment.Namespace, deployment); err != nil { + return fmt.Errorf("couldn't update deployment %s to status %s: %v", labelForDeployment(deployment), nextStatus, err) } + glog.V(2).Infof("Updated deployment %s status from %s to %s", labelForDeployment(deployment), currentStatus, nextStatus) } + + return nil } -// makeDeploymentPod creates a pod which implements deployment behavior. The pod is correlated to +// makeDeployerPod creates a pod which implements deployment behavior. The pod is correlated to // the deployment with an annotation. -func (dc *DeploymentController) makeDeploymentPod(deployment *kapi.ReplicationController) (*kapi.Pod, error) { +func (dc *DeploymentController) makeDeployerPod(deployment *kapi.ReplicationController) (*kapi.Pod, error) { var deploymentConfig *deployapi.DeploymentConfig var decodeError error if deploymentConfig, decodeError = deployutil.DecodeDeploymentConfig(deployment, dc.Codec); decodeError != nil { @@ -186,7 +177,7 @@ func (dc *DeploymentController) makeDeploymentPod(deployment *kapi.ReplicationCo pod := &kapi.Pod{ ObjectMeta: kapi.ObjectMeta{ - GenerateName: fmt.Sprintf("deploy-%s-", deployment.Name), + GenerateName: deployutil.DeployerPodNameForDeployment(deployment), Annotations: map[string]string{ deployapi.DeploymentAnnotation: deployment.Name, }, @@ -212,3 +203,68 @@ func (dc *DeploymentController) makeDeploymentPod(deployment *kapi.ReplicationCo return pod, nil } + +// labelFor builds a string identifier for a DeploymentConfig. +func labelForDeployment(deployment *kapi.ReplicationController) string { + return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) +} + +// statusFor gets the DeploymentStatus for deployment from its annotations. +func statusFor(deployment *kapi.ReplicationController) deployapi.DeploymentStatus { + return deployapi.DeploymentStatus(deployment.Annotations[deployapi.DeploymentStatusAnnotation]) +} + +// DeploymentContainerCreator knows how to create a deployment pod's container based on +// the deployment's strategy. +type DeploymentContainerCreator interface { + CreateContainer(*deployapi.DeploymentStrategy) *kapi.Container +} + +// DeploymentControllerDeploymentClient abstracts access to deployments. +type DeploymentControllerDeploymentClient interface { + GetDeployment(namespace, name string) (*kapi.ReplicationController, error) + UpdateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} + +// DeploymentControllerPodClient abstracts access to pods. +type DeploymentControllerPodClient interface { + CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) + DeletePod(namespace, name string) error +} + +// DeploymentContainerCreatorImpl is a pluggable DeploymentContainerCreator. +type DeploymentContainerCreatorImpl struct { + CreateContainerFunc func(*deployapi.DeploymentStrategy) *kapi.Container +} + +func (i *DeploymentContainerCreatorImpl) CreateContainer(strategy *deployapi.DeploymentStrategy) *kapi.Container { + return i.CreateContainerFunc(strategy) +} + +// DeploymentControllerDeploymentClientImpl is a pluggable deploymentControllerDeploymentClient. +type DeploymentControllerDeploymentClientImpl struct { + GetDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) + UpdateDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} + +func (i *DeploymentControllerDeploymentClientImpl) GetDeployment(namespace, name string) (*kapi.ReplicationController, error) { + return i.GetDeploymentFunc(namespace, name) +} + +func (i *DeploymentControllerDeploymentClientImpl) UpdateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return i.UpdateDeploymentFunc(namespace, deployment) +} + +// deploymentControllerPodClientImpl is a pluggable deploymentControllerPodClient. +type DeploymentControllerPodClientImpl struct { + CreatePodFunc func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) + DeletePodFunc func(namespace, name string) error +} + +func (i *DeploymentControllerPodClientImpl) CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + return i.CreatePodFunc(namespace, pod) +} + +func (i *DeploymentControllerPodClientImpl) DeletePod(namespace, name string) error { + return i.DeletePodFunc(namespace, name) +} diff --git a/pkg/deploy/controller/deployment_controller_test.go b/pkg/deploy/controller/deployment_controller_test.go index c5e2d7ceeeb3..b6737b92e6fc 100644 --- a/pkg/deploy/controller/deployment_controller_test.go +++ b/pkg/deploy/controller/deployment_controller_test.go @@ -9,7 +9,7 @@ import ( api "github.com/openshift/origin/pkg/api/latest" deployapi "github.com/openshift/origin/pkg/deploy/api" - deploytest "github.com/openshift/origin/pkg/deploy/controller/test" + deploytest "github.com/openshift/origin/pkg/deploy/api/test" deployutil "github.com/openshift/origin/pkg/deploy/util" ) @@ -17,29 +17,24 @@ func TestHandleNewDeploymentCreatePodOk(t *testing.T) { var ( updatedDeployment *kapi.ReplicationController createdPod *kapi.Pod - expectedContainer = basicContainer() + expectedContainer = okContainer() ) controller := &DeploymentController{ Codec: api.Codec, - DeploymentInterface: &testDcDeploymentInterface{ + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { updatedDeployment = deployment return updatedDeployment, nil }, }, - PodInterface: &testDcPodInterface{ + PodClient: &DeploymentControllerPodClientImpl{ CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { createdPod = pod return pod, nil }, }, - NextDeployment: func() *kapi.ReplicationController { - deployment := basicDeployment() - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusNew) - return deployment - }, - ContainerCreator: &testContainerCreator{ + ContainerCreator: &DeploymentContainerCreatorImpl{ CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { return expectedContainer }, @@ -47,7 +42,14 @@ func TestHandleNewDeploymentCreatePodOk(t *testing.T) { } // Verify new -> pending - controller.HandleDeployment() + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusNew) + err := controller.HandleDeployment(deployment) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if updatedDeployment == nil { t.Fatalf("expected an updated deployment") @@ -93,336 +95,381 @@ func TestHandleNewDeploymentCreatePodFail(t *testing.T) { controller := &DeploymentController{ Codec: api.Codec, - DeploymentInterface: &testDcDeploymentInterface{ + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ UpdateDeploymentFunc: func(namspace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { updatedDeployment = deployment return updatedDeployment, nil }, }, - PodInterface: &testDcPodInterface{ + PodClient: &DeploymentControllerPodClientImpl{ CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { return nil, fmt.Errorf("Failed to create pod %s", pod.Name) }, }, - NextDeployment: func() *kapi.ReplicationController { - deployment := basicDeployment() - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusNew) - return deployment - }, - ContainerCreator: &testContainerCreator{ + ContainerCreator: &DeploymentContainerCreatorImpl{ CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { - return basicContainer() + return okContainer() }, }, } - // Verify new -> failed - controller.HandleDeployment() + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusNew) + err := controller.HandleDeployment(deployment) - if updatedDeployment == nil { - t.Fatalf("expected an updated deployment") - } - - if e, a := string(deployapi.DeploymentStatusFailed), updatedDeployment.Annotations[deployapi.DeploymentStatusAnnotation]; e != a { - t.Fatalf("expected updated deployment status %s, got %s", e, a) + if err == nil { + t.Fatalf("expected an error") } } func TestHandleNewDeploymentCreatePodAlreadyExists(t *testing.T) { - var updatedDeployment *kapi.ReplicationController - controller := &DeploymentController{ Codec: api.Codec, - DeploymentInterface: &testDcDeploymentInterface{ + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - updatedDeployment = deployment - return updatedDeployment, nil + t.Fatalf("unexpected deployment update") + return nil, nil }, }, - PodInterface: &testDcPodInterface{ + PodClient: &DeploymentControllerPodClientImpl{ CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - return nil, kerrors.NewAlreadyExists("pod", pod.Name) + return nil, kerrors.NewAlreadyExists("Pod", pod.Name) }, }, - NextDeployment: func() *kapi.ReplicationController { - deployment := basicDeployment() - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusNew) - return deployment - }, - ContainerCreator: &testContainerCreator{ + ContainerCreator: &DeploymentContainerCreatorImpl{ CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { - return basicContainer() + return okContainer() }, }, } - // Verify new -> pending - controller.HandleDeployment() - - if updatedDeployment == nil { - t.Fatalf("expected an updated deployment") - } + // Verify no-op + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusPending) + err := controller.HandleDeployment(deployment) - if e, a := string(deployapi.DeploymentStatusPending), updatedDeployment.Annotations[deployapi.DeploymentStatusAnnotation]; e != a { - t.Fatalf("expected updated deployment status %s, got %s", e, a) + if err != nil { + t.Fatalf("unexpected error: %v", err) } } -func TestHandleUncorrelatedPod(t *testing.T) { +func TestHandleDeploymentNoops(t *testing.T) { controller := &DeploymentController{ Codec: api.Codec, - DeploymentInterface: &testDcDeploymentInterface{ + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("Unexpected deployment update") + t.Fatalf("unexpected deployment update") return nil, nil }, }, - PodInterface: &testDcPodInterface{}, - NextDeployment: func() *kapi.ReplicationController { return nil }, - NextPod: func() *kapi.Pod { - pod := runningPod() - pod.Annotations = make(map[string]string) - return pod + PodClient: &DeploymentControllerPodClientImpl{ + CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + t.Fatalf("unexpected call to create pod") + return nil, nil + }, + }, + ContainerCreator: &DeploymentContainerCreatorImpl{ + CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { + t.Fatalf("unexpected call to create container") + return nil + }, }, - DeploymentStore: deploytest.NewFakeDeploymentStore(pendingDeployment()), } // Verify no-op - controller.HandlePod() + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + + noopStatus := []deployapi.DeploymentStatus{ + deployapi.DeploymentStatusPending, + deployapi.DeploymentStatusRunning, + deployapi.DeploymentStatusFailed, + } + for _, status := range noopStatus { + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(status) + err := controller.HandleDeployment(deployment) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } } -func TestHandleOrphanedPod(t *testing.T) { +func TestHandleDeploymentPodCleanupOk(t *testing.T) { + podName := "pod" + deletedPodName := "" + deletedPodNamespace := "" + controller := &DeploymentController{ Codec: api.Codec, - DeploymentInterface: &testDcDeploymentInterface{ + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("Unexpected deployment update") + t.Fatalf("unexpected deployment update") return nil, nil }, }, - PodInterface: &testDcPodInterface{}, - NextDeployment: func() *kapi.ReplicationController { return nil }, - NextPod: func() *kapi.Pod { return runningPod() }, - DeploymentStore: deploytest.NewFakeDeploymentStore(nil), + PodClient: &DeploymentControllerPodClientImpl{ + CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + t.Fatalf("unexpected call to create pod") + return nil, nil + }, + DeletePodFunc: func(namespace, name string) error { + deletedPodNamespace = namespace + deletedPodName = name + return nil + }, + }, + ContainerCreator: &DeploymentContainerCreatorImpl{ + CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { + t.Fatalf("unexpected call to create container") + return nil + }, + }, } - // Verify no-op - controller.HandlePod() -} + // Verify successful cleanup + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusComplete) + deployment.Annotations[deployapi.DeploymentPodAnnotation] = podName + err := controller.HandleDeployment(deployment) -func TestHandlePodRunning(t *testing.T) { - var updatedDeployment *kapi.ReplicationController + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if e, a := deployment.Namespace, deletedPodNamespace; e != a { + t.Fatalf("expected deleted pod namespace %s, got %s", e, a) + } + + if e, a := podName, deletedPodName; e != a { + t.Fatalf("expected deleted pod name %s, got %s", e, a) + } +} + +func TestHandleDeploymentPodCleanupNoop(t *testing.T) { controller := &DeploymentController{ Codec: api.Codec, - DeploymentInterface: &testDcDeploymentInterface{ + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - updatedDeployment = deployment - return deployment, nil + t.Fatalf("unexpected deployment update") + return nil, nil + }, + }, + PodClient: &DeploymentControllerPodClientImpl{ + CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + t.Fatalf("unexpected call to create pod") + return nil, nil + }, + DeletePodFunc: func(namespace, name string) error { + return kerrors.NewNotFound("Pod", name) }, }, - PodInterface: &testDcPodInterface{}, - NextDeployment: func() *kapi.ReplicationController { - return nil + ContainerCreator: &DeploymentContainerCreatorImpl{ + CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { + t.Fatalf("unexpected call to create container") + return nil + }, }, - NextPod: func() *kapi.Pod { return runningPod() }, - DeploymentStore: deploytest.NewFakeDeploymentStore(pendingDeployment()), } - controller.HandlePod() - - if updatedDeployment == nil { - t.Fatalf("Expected a deployment to be updated") - } + // Verify no-op + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusComplete) + deployment.Annotations[deployapi.DeploymentPodAnnotation] = "pod" + err := controller.HandleDeployment(deployment) - if e, a := string(deployapi.DeploymentStatusRunning), updatedDeployment.Annotations[deployapi.DeploymentStatusAnnotation]; e != a { - t.Fatalf("expected updated deployment status %s, got %s", e, a) + if err != nil { + t.Fatalf("unexpected error: %v", err) } } -func TestHandlePodTerminatedOk(t *testing.T) { - var updatedDeployment *kapi.ReplicationController - var deletedPodID string - +func TestHandleDeploymentPodCleanupFailure(t *testing.T) { controller := &DeploymentController{ Codec: api.Codec, - DeploymentInterface: &testDcDeploymentInterface{ + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - updatedDeployment = deployment - return deployment, nil + t.Fatalf("unexpected deployment update") + return nil, nil }, }, - PodInterface: &testDcPodInterface{ + PodClient: &DeploymentControllerPodClientImpl{ + CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + t.Fatalf("unexpected call to create pod") + return nil, nil + }, DeletePodFunc: func(namespace, name string) error { - deletedPodID = name + return kerrors.NewInternalError(fmt.Errorf("test error")) + }, + }, + ContainerCreator: &DeploymentContainerCreatorImpl{ + CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { + t.Fatalf("unexpected call to create container") return nil }, }, - NextDeployment: func() *kapi.ReplicationController { return nil }, - NextPod: func() *kapi.Pod { return succeededPod() }, - DeploymentStore: deploytest.NewFakeDeploymentStore(runningDeployment()), } - controller.HandlePod() + // Verify error + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusComplete) + deployment.Annotations[deployapi.DeploymentPodAnnotation] = "pod" + err := controller.HandleDeployment(deployment) - if updatedDeployment == nil { - t.Fatalf("Expected a deployment to be updated") + if err == nil { + t.Fatalf("expected an error") } +} - if e, a := string(deployapi.DeploymentStatusComplete), updatedDeployment.Annotations[deployapi.DeploymentStatusAnnotation]; e != a { - t.Fatalf("expected updated deployment status %s, got %s", e, a) +func TestHandleUncorrelatedPod(t *testing.T) { + controller := &DeploymentController{ + Codec: api.Codec, + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ + UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected deployment update") + return nil, nil + }, + }, } - if len(deletedPodID) == 0 { - t.Fatalf("expected pod to be deleted") + // Verify no-op + pod := runningPod() + pod.Annotations = make(map[string]string) + err := controller.HandlePod(pod) + + if err != nil { + t.Fatalf("unexpected err: %v", err) } } -func TestHandlePodTerminatedNotOk(t *testing.T) { +func TestHandleOrphanedPod(t *testing.T) { + controller := &DeploymentController{ + Codec: api.Codec, + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ + UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("Unexpected deployment update") + return nil, nil + }, + GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return nil, kerrors.NewNotFound("ReplicationController", name) + }, + }, + } + + err := controller.HandlePod(runningPod()) + + if err == nil { + t.Fatalf("expected an error") + } +} + +func TestHandlePodRunning(t *testing.T) { var updatedDeployment *kapi.ReplicationController controller := &DeploymentController{ Codec: api.Codec, - DeploymentInterface: &testDcDeploymentInterface{ + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ + GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusPending) + return deployment, nil + }, UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { updatedDeployment = deployment return deployment, nil }, }, - PodInterface: &testDcPodInterface{ - DeletePodFunc: func(namespace, name string) error { - t.Fatalf("unexpected delete of pod %s", name) - return nil - }, - }, - ContainerCreator: &testContainerCreator{ - CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { - return basicContainer() - }, - }, - NextDeployment: func() *kapi.ReplicationController { return nil }, - NextPod: func() *kapi.Pod { return failedPod() }, - DeploymentStore: deploytest.NewFakeDeploymentStore(runningDeployment()), } - controller.HandlePod() + err := controller.HandlePod(runningPod()) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } if updatedDeployment == nil { - t.Fatalf("Expected a deployment to be updated") + t.Fatalf("expected deployment update") } - if e, a := string(deployapi.DeploymentStatusFailed), updatedDeployment.Annotations[deployapi.DeploymentStatusAnnotation]; e != a { + if e, a := deployapi.DeploymentStatusRunning, statusFor(updatedDeployment); e != a { t.Fatalf("expected updated deployment status %s, got %s", e, a) } } -type testContainerCreator struct { - CreateContainerFunc func(strategy *deployapi.DeploymentStrategy) *kapi.Container -} +func TestHandlePodTerminatedOk(t *testing.T) { + var updatedDeployment *kapi.ReplicationController -func (t *testContainerCreator) CreateContainer(strategy *deployapi.DeploymentStrategy) *kapi.Container { - return t.CreateContainerFunc(strategy) -} + controller := &DeploymentController{ + Codec: api.Codec, + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ + GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusRunning) + return deployment, nil + }, + UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + updatedDeployment = deployment + return deployment, nil + }, + }, + } -type testDcDeploymentInterface struct { - UpdateDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) -} + err := controller.HandlePod(succeededPod()) -func (i *testDcDeploymentInterface) UpdateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - return i.UpdateDeploymentFunc(namespace, deployment) -} + if err != nil { + t.Fatalf("unexpected error: %v", err) + } -type testDcPodInterface struct { - CreatePodFunc func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) - DeletePodFunc func(namespace, name string) error -} + if updatedDeployment == nil { + t.Fatalf("expected deployment update") + } -func (i *testDcPodInterface) CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - return i.CreatePodFunc(namespace, pod) + if e, a := deployapi.DeploymentStatusComplete, statusFor(updatedDeployment); e != a { + t.Fatalf("expected updated deployment status %s, got %s", e, a) + } } -func (i *testDcPodInterface) DeletePod(namespace, name string) error { - return i.DeletePodFunc(namespace, name) -} +func TestHandlePodTerminatedNotOk(t *testing.T) { + var updatedDeployment *kapi.ReplicationController -func basicDeploymentConfig() *deployapi.DeploymentConfig { - return &deployapi.DeploymentConfig{ - ObjectMeta: kapi.ObjectMeta{Name: "deploy1"}, - Triggers: []deployapi.DeploymentTriggerPolicy{ - { - Type: deployapi.DeploymentTriggerManual, + controller := &DeploymentController{ + Codec: api.Codec, + DeploymentClient: &DeploymentControllerDeploymentClientImpl{ + GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusRunning) + return deployment, nil }, - }, - Template: deployapi.DeploymentTemplate{ - Strategy: deployapi.DeploymentStrategy{ - Type: deployapi.DeploymentStrategyTypeRecreate, - }, - ControllerTemplate: kapi.ReplicationControllerSpec{ - Replicas: 1, - Selector: map[string]string{ - "name": "test-pod", - }, - Template: &kapi.PodTemplateSpec{ - ObjectMeta: kapi.ObjectMeta{ - Labels: map[string]string{ - "name": "test-pod", - }, - }, - Spec: kapi.PodSpec{ - Containers: []kapi.Container{ - { - Name: "container-1", - Image: "registry:8080/openshift/test-image:ref-1", - }, - }, - }, - }, + UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + updatedDeployment = deployment + return deployment, nil }, }, } -} -func basicDeployment() *kapi.ReplicationController { - config := basicDeploymentConfig() - encodedConfig, _ := deployutil.EncodeDeploymentConfig(config, api.Codec) - return &kapi.ReplicationController{ - ObjectMeta: kapi.ObjectMeta{ - Name: "deploy1", - Annotations: map[string]string{ - deployapi.DeploymentConfigAnnotation: config.Name, - deployapi.DeploymentStatusAnnotation: string(deployapi.DeploymentStatusNew), - deployapi.DeploymentEncodedConfigAnnotation: encodedConfig, - }, - Labels: config.Labels, - }, - Spec: kapi.ReplicationControllerSpec{ - Template: &kapi.PodTemplateSpec{ - Spec: kapi.PodSpec{ - Containers: []kapi.Container{ - { - Name: "container1", - Image: "registry:8080/repo1:ref1", - }, - }, - }, - }, - }, + err := controller.HandlePod(failedPod()) + + if err != nil { + t.Fatalf("unexpected error: %v", err) } -} -func pendingDeployment() *kapi.ReplicationController { - d := basicDeployment() - d.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusPending) - return d -} + if updatedDeployment == nil { + t.Fatalf("expected deployment update") + } -func runningDeployment() *kapi.ReplicationController { - d := basicDeployment() - d.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusRunning) - return d + if e, a := deployapi.DeploymentStatusFailed, statusFor(updatedDeployment); e != a { + t.Fatalf("expected updated deployment status %s, got %s", e, a) + } } -func basicContainer() *kapi.Container { +func okContainer() *kapi.Container { return &kapi.Container{ Image: "test/image", Command: []string{"command"}, @@ -435,7 +482,7 @@ func basicContainer() *kapi.Container { } } -func basicPod() *kapi.Pod { +func okPod() *kapi.Pod { return &kapi.Pod{ ObjectMeta: kapi.ObjectMeta{ Name: "deploy-deploy1", @@ -452,13 +499,13 @@ func basicPod() *kapi.Pod { } func succeededPod() *kapi.Pod { - p := basicPod() + p := okPod() p.Status.Phase = kapi.PodSucceeded return p } func failedPod() *kapi.Pod { - p := basicPod() + p := okPod() p.Status.Phase = kapi.PodFailed p.Status.Info["container1"] = kapi.ContainerStatus{ State: kapi.ContainerState{ @@ -471,7 +518,7 @@ func failedPod() *kapi.Pod { } func runningPod() *kapi.Pod { - p := basicPod() + p := okPod() p.Status.Phase = kapi.PodRunning return p } diff --git a/pkg/deploy/controller/factory/factory.go b/pkg/deploy/controller/factory/factory.go index 552b6f3ac291..3a7af8059857 100644 --- a/pkg/deploy/controller/factory/factory.go +++ b/pkg/deploy/controller/factory/factory.go @@ -6,6 +6,7 @@ import ( "github.com/golang/glog" kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -15,6 +16,7 @@ import ( osclient "github.com/openshift/origin/pkg/client" deployapi "github.com/openshift/origin/pkg/deploy/api" controller "github.com/openshift/origin/pkg/deploy/controller" + deployutil "github.com/openshift/origin/pkg/deploy/util" imageapi "github.com/openshift/origin/pkg/image/api" ) @@ -28,11 +30,26 @@ type DeploymentConfigControllerFactory struct { } func (factory *DeploymentConfigControllerFactory) Create() *controller.DeploymentConfigController { + deploymentConfigLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(&deploymentConfigLW{factory.Client}, &deployapi.DeploymentConfig{}, queue).RunUntil(factory.Stop) + cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue).RunUntil(factory.Stop) return &controller.DeploymentConfigController{ - DeploymentInterface: &ClientDeploymentInterface{factory.KubeClient}, + DeploymentClient: &controller.DeploymentConfigControllerDeploymentClientImpl{ + GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Get(name) + }, + CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Create(deployment) + }, + }, NextDeploymentConfig: func() *deployapi.DeploymentConfig { config := queue.Pop().(*deployapi.DeploymentConfig) panicIfStopped(factory.Stop, "deployment config controller stopped") @@ -61,17 +78,22 @@ type DeploymentControllerFactory struct { Codec runtime.Codec // Stop may be set to allow controllers created by this factory to be terminated. Stop <-chan struct{} - - // deploymentStore is maintained on the factory to support narrowing of the pod polling scope. - deploymentStore cache.Store } func (factory *DeploymentControllerFactory) Create() *controller.DeploymentController { + deploymentLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } deploymentQueue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(&deploymentLW{client: factory.KubeClient, field: labels.Everything()}, &kapi.ReplicationController{}, deploymentQueue).RunUntil(factory.Stop) + cache.NewReflector(deploymentLW, &kapi.ReplicationController{}, deploymentQueue).RunUntil(factory.Stop) - factory.deploymentStore = cache.NewStore(cache.MetaNamespaceKeyFunc) - cache.NewReflector(&deploymentLW{client: factory.KubeClient, field: labels.Everything()}, &kapi.ReplicationController{}, factory.deploymentStore).RunUntil(factory.Stop) + deploymentStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + cache.NewReflector(deploymentLW, &kapi.ReplicationController{}, deploymentStore).RunUntil(factory.Stop) // Kubernetes does not currently synchronize Pod status in storage with a Pod's container // states. Because of this, we can't receive events related to container (and thus Pod) @@ -82,13 +104,44 @@ func (factory *DeploymentControllerFactory) Create() *controller.DeploymentContr // TODO: Find a way to get watch events for Pod/container status updates. The polling // strategy is horribly inefficient and should be addressed upstream somehow. podQueue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewPoller(factory.pollPods, 10*time.Second, podQueue).RunUntil(factory.Stop) + pollFunc := func() (cache.Enumerator, error) { + return pollPods(deploymentStore, factory.KubeClient) + } + cache.NewPoller(pollFunc, 10*time.Second, podQueue).RunUntil(factory.Stop) return &controller.DeploymentController{ - ContainerCreator: factory, - DeploymentInterface: &ClientDeploymentInterface{factory.KubeClient}, - PodInterface: &DeploymentControllerPodInterface{factory.KubeClient}, - Environment: factory.Environment, + ContainerCreator: &defaultContainerCreator{factory.RecreateStrategyImage}, + DeploymentClient: &controller.DeploymentControllerDeploymentClientImpl{ + // Since we need to use a deployment cache to support the pod poller, go ahead and use + // it for other deployment lookups and maintain the usual REST API for not-found errors. + GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + example := &kapi.ReplicationController{ + ObjectMeta: kapi.ObjectMeta{ + Namespace: namespace, + Name: name, + }} + obj, exists, err := deploymentStore.Get(example) + if !exists { + return nil, kerrors.NewNotFound(example.Kind, name) + } + if err != nil { + return nil, err + } + return obj.(*kapi.ReplicationController), nil + }, + UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Update(deployment) + }, + }, + PodClient: &controller.DeploymentControllerPodClientImpl{ + CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + return factory.KubeClient.Pods(namespace).Create(pod) + }, + DeletePodFunc: func(namespace, name string) error { + return factory.KubeClient.Pods(namespace).Delete(name) + }, + }, + Environment: factory.Environment, NextDeployment: func() *kapi.ReplicationController { deployment := deploymentQueue.Pop().(*kapi.ReplicationController) panicIfStopped(factory.Stop, "deployment controller stopped") @@ -99,22 +152,25 @@ func (factory *DeploymentControllerFactory) Create() *controller.DeploymentContr panicIfStopped(factory.Stop, "deployment controller stopped") return pod }, - DeploymentStore: factory.deploymentStore, - UseLocalImages: factory.UseLocalImages, - Codec: factory.Codec, - Stop: factory.Stop, + UseLocalImages: factory.UseLocalImages, + Codec: factory.Codec, + Stop: factory.Stop, } } -// CreateContainer lets DeploymentControllerFactory satisfy the DeploymentContainerCreator interface -// and makes a container using the configuration of the factory. -func (factory *DeploymentControllerFactory) CreateContainer(strategy *deployapi.DeploymentStrategy) *kapi.Container { +// CreateContainer is the default DeploymentContainerCreator. It makes containers using only +// the input strategy parameters and a user defined image for the Recreate strategy. +type defaultContainerCreator struct { + recreateStrategyImage string +} + +func (c *defaultContainerCreator) CreateContainer(strategy *deployapi.DeploymentStrategy) *kapi.Container { // Every strategy type should be handled here. switch strategy.Type { case deployapi.DeploymentStrategyTypeRecreate: // Use the factory-configured image. return &kapi.Container{ - Image: factory.RecreateStrategyImage, + Image: c.recreateStrategyImage, } case deployapi.DeploymentStrategyTypeCustom: // Use user-defined values from the strategy input. @@ -131,10 +187,10 @@ func (factory *DeploymentControllerFactory) CreateContainer(strategy *deployapi. // pollPods lists all pods associated with pending or running deployments and returns // a cache.Enumerator suitable for use with a cache.Poller. -func (factory *DeploymentControllerFactory) pollPods() (cache.Enumerator, error) { +func pollPods(deploymentStore cache.Store, kClient kclient.PodsNamespacer) (cache.Enumerator, error) { list := &kapi.PodList{} - for _, obj := range factory.deploymentStore.List() { + for _, obj := range deploymentStore.List() { deployment := obj.(*kapi.ReplicationController) switch deployapi.DeploymentStatus(deployment.Annotations[deployapi.DeploymentStatusAnnotation]) { @@ -146,7 +202,7 @@ func (factory *DeploymentControllerFactory) pollPods() (cache.Enumerator, error) continue } - pod, err := factory.KubeClient.Pods(deployment.Namespace).Get(podID) + pod, err := kClient.Pods(deployment.Namespace).Get(podID) if err != nil { glog.V(2).Infof("Couldn't find pod %s for deployment %s: %#v", podID, deployment.Name, err) continue @@ -159,18 +215,6 @@ func (factory *DeploymentControllerFactory) pollPods() (cache.Enumerator, error) return &podEnumerator{list}, nil } -type DeploymentControllerPodInterface struct { - KubeClient kclient.Interface -} - -func (i DeploymentControllerPodInterface) CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - return i.KubeClient.Pods(namespace).Create(pod) -} - -func (i DeploymentControllerPodInterface) DeletePod(namespace, id string) error { - return i.KubeClient.Pods(namespace).Delete(id) -} - // podEnumerator allows a cache.Poller to enumerate items in an api.PodList type podEnumerator struct { *kapi.PodList @@ -200,22 +244,36 @@ type DeploymentConfigChangeControllerFactory struct { } func (factory *DeploymentConfigChangeControllerFactory) Create() *controller.DeploymentConfigChangeController { + deploymentConfigLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(&deploymentConfigLW{factory.Client}, &deployapi.DeploymentConfig{}, queue).RunUntil(factory.Stop) - - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - cache.NewReflector(&deploymentLW{client: factory.KubeClient, field: labels.Everything()}, &kapi.ReplicationController{}, store).RunUntil(factory.Stop) + cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue).RunUntil(factory.Stop) return &controller.DeploymentConfigChangeController{ - ChangeStrategy: &ClientDeploymentConfigInterface{factory.Client}, + ChangeStrategy: &controller.ChangeStrategyImpl{ + GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Get(name) + }, + GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + return factory.Client.DeploymentConfigs(namespace).Generate(name) + }, + UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + return factory.Client.DeploymentConfigs(namespace).Update(config) + }, + }, NextDeploymentConfig: func() *deployapi.DeploymentConfig { config := queue.Pop().(*deployapi.DeploymentConfig) panicIfStopped(factory.Stop, "deployment config change controller stopped") return config }, - DeploymentStore: store, - Codec: factory.Codec, - Stop: factory.Stop, + Codec: factory.Codec, + Stop: factory.Stop, } } @@ -228,15 +286,45 @@ type ImageChangeControllerFactory struct { } func (factory *ImageChangeControllerFactory) Create() *controller.ImageChangeController { + imageRepositoryLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.Client.ImageRepositories(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.Client.ImageRepositories(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(&imageRepositoryLW{factory.Client}, &imageapi.ImageRepository{}, queue).RunUntil(factory.Stop) + cache.NewReflector(imageRepositoryLW, &imageapi.ImageRepository{}, queue).RunUntil(factory.Stop) + deploymentConfigLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } store := cache.NewStore(cache.MetaNamespaceKeyFunc) - cache.NewReflector(&deploymentConfigLW{factory.Client}, &deployapi.DeploymentConfig{}, store).RunUntil(factory.Stop) + cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, store).RunUntil(factory.Stop) return &controller.ImageChangeController{ - DeploymentConfigInterface: &ClientDeploymentConfigInterface{factory.Client}, - DeploymentConfigStore: store, + DeploymentConfigClient: &controller.ImageChangeControllerDeploymentConfigClientImpl{ + ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { + configs := []*deployapi.DeploymentConfig{} + objs := store.List() + for _, obj := range objs { + configs = append(configs, obj.(*deployapi.DeploymentConfig)) + } + return configs, nil + }, + GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + return factory.Client.DeploymentConfigs(namespace).Generate(name) + }, + UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + return factory.Client.DeploymentConfigs(namespace).Update(config) + }, + }, NextImageRepository: func() *imageapi.ImageRepository { repo := queue.Pop().(*imageapi.ImageRepository) panicIfStopped(factory.Stop, "deployment config change controller stopped") @@ -254,84 +342,3 @@ func panicIfStopped(ch <-chan struct{}, message interface{}) { default: } } - -// deploymentLW is a ListWatcher implementation for Deployments. -type deploymentLW struct { - client kclient.Interface - field labels.Selector -} - -// List lists all Deployments which match the given field selector. -func (lw *deploymentLW) List() (runtime.Object, error) { - return lw.client.ReplicationControllers(kapi.NamespaceAll).List(labels.Everything()) -} - -// Watch watches all Deployments matching the given field selector. -func (lw *deploymentLW) Watch(resourceVersion string) (watch.Interface, error) { - return lw.client.ReplicationControllers(kapi.NamespaceAll).Watch(labels.Everything(), lw.field, resourceVersion) -} - -// deploymentConfigLW is a ListWatcher implementation for DeploymentConfigs. -type deploymentConfigLW struct { - client osclient.Interface -} - -// List lists all DeploymentConfigs. -func (lw *deploymentConfigLW) List() (runtime.Object, error) { - return lw.client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) -} - -// Watch watches all DeploymentConfigs. -func (lw *deploymentConfigLW) Watch(resourceVersion string) (watch.Interface, error) { - return lw.client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) -} - -// imageRepositoryLW is a ListWatcher for ImageRepositories. -type imageRepositoryLW struct { - client osclient.Interface -} - -// List lists all ImageRepositories. -func (lw *imageRepositoryLW) List() (runtime.Object, error) { - return lw.client.ImageRepositories(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) -} - -// Watch watches all ImageRepositories. -func (lw *imageRepositoryLW) Watch(resourceVersion string) (watch.Interface, error) { - return lw.client.ImageRepositories(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) -} - -// ClientDeploymentInterface is a dccDeploymentInterface and dcDeploymentInterface which delegates to the OpenShift client interfaces -type ClientDeploymentInterface struct { - Client kclient.Interface -} - -// GetDeployment returns deployment using OpenShift client. -func (c ClientDeploymentInterface) GetDeployment(namespace, name string) (*kapi.ReplicationController, error) { - return c.Client.ReplicationControllers(namespace).Get(name) -} - -// CreateDeployment creates deployment using OpenShift client. -func (c ClientDeploymentInterface) CreateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - return c.Client.ReplicationControllers(namespace).Create(deployment) -} - -// UpdateDeployment creates deployment using OpenShift client. -func (c ClientDeploymentInterface) UpdateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - return c.Client.ReplicationControllers(namespace).Update(deployment) -} - -// ClientDeploymentConfigInterface is a changeStrategy which delegates to the OpenShift client interfaces -type ClientDeploymentConfigInterface struct { - Client osclient.Interface -} - -// GenerateDeploymentConfig generates deploymentConfig using OpenShift client. -func (c ClientDeploymentConfigInterface) GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) { - return c.Client.DeploymentConfigs(namespace).Generate(name) -} - -// UpdateDeploymentConfig creates deploymentConfig using OpenShift client. -func (c ClientDeploymentConfigInterface) UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { - return c.Client.DeploymentConfigs(namespace).Update(config) -} diff --git a/pkg/deploy/controller/image_change_controller.go b/pkg/deploy/controller/image_change_controller.go index 3e72b8d65720..d6c8b719504f 100644 --- a/pkg/deploy/controller/image_change_controller.go +++ b/pkg/deploy/controller/image_change_controller.go @@ -5,7 +5,6 @@ import ( "github.com/golang/glog" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" deployapi "github.com/openshift/origin/pkg/deploy/api" @@ -16,32 +15,34 @@ import ( // DeploymentConfigs when a new version of a tag referenced by a DeploymentConfig // is available. type ImageChangeController struct { - DeploymentConfigInterface icDeploymentConfigInterface - NextImageRepository func() *imageapi.ImageRepository - DeploymentConfigStore cache.Store + DeploymentConfigClient ImageChangeControllerDeploymentConfigClient + NextImageRepository func() *imageapi.ImageRepository // Stop is an optional channel that controls when the controller exits Stop <-chan struct{} } -type icDeploymentConfigInterface interface { - UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) - GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) -} - // Run processes ImageRepository events one by one. func (c *ImageChangeController) Run() { - go util.Until(c.HandleImageRepo, 0, c.Stop) + go util.Until(func() { + err := c.HandleImageRepo(c.NextImageRepository()) + if err != nil { + glog.Errorf("%v", err) + } + }, 0, c.Stop) } // HandleImageRepo processes the next ImageRepository event. -func (c *ImageChangeController) HandleImageRepo() { - imageRepo := c.NextImageRepository() +func (c *ImageChangeController) HandleImageRepo(imageRepo *imageapi.ImageRepository) error { configsToGenerate := []*deployapi.DeploymentConfig{} firedTriggersForConfig := make(map[string][]deployapi.DeploymentTriggerImageChangeParams) - for _, c := range c.DeploymentConfigStore.List() { - config := c.(*deployapi.DeploymentConfig) - glog.V(4).Infof("Detecting changed images for deploymentConfig %s", config.Name) + configs, err := c.DeploymentConfigClient.ListDeploymentConfigs() + if err != nil { + return fmt.Errorf("couldn't get list of deploymentConfigs while handling imageRepo %s: %v", labelForRepo(imageRepo), err) + } + + for _, config := range configs { + glog.V(4).Infof("Detecting changed images for deploymentConfig %s", labelFor(config)) // Extract relevant triggers for this imageRepo for this config triggersForConfig := []deployapi.DeploymentTriggerImageChangeParams{} @@ -51,13 +52,13 @@ func (c *ImageChangeController) HandleImageRepo() { continue } if triggerMatchesImage(config, trigger.ImageChangeParams, imageRepo) { - glog.V(4).Infof("Found matching %s trigger for deploymentConfig %s: %#v", trigger.Type, config.Name, trigger.ImageChangeParams) + glog.V(4).Infof("Found matching %s trigger for deploymentConfig %s: %#v", trigger.Type, labelFor(config), trigger.ImageChangeParams) triggersForConfig = append(triggersForConfig, *trigger.ImageChangeParams) } } for _, params := range triggersForConfig { - glog.V(4).Infof("Processing image triggers for deploymentConfig %s", config.Name) + glog.V(4).Infof("Processing image triggers for deploymentConfig %s", labelFor(config)) containerNames := util.NewStringSet(params.ContainerNames...) for _, container := range config.Template.ControllerTemplate.Template.Spec.Containers { if !containerNames.Has(container.Name) { @@ -67,7 +68,7 @@ func (c *ImageChangeController) HandleImageRepo() { // The container image's tag name is by convention the same as the image ID it references _, _, _, containerImageID, err := imageapi.SplitDockerPullSpec(container.Image) if err != nil { - glog.V(4).Infof("Skipping container %s; container's image is invalid: %v", container.Name, err) + glog.V(4).Infof("Skipping container %s for config %s; container's image is invalid: %v", container.Name, labelFor(config), err) continue } @@ -79,13 +80,22 @@ func (c *ImageChangeController) HandleImageRepo() { } } + anyFailed := false for _, config := range configsToGenerate { - glog.V(4).Infof("Regenerating deploymentConfig %s/%s", config.Namespace, config.Name) err := c.regenerate(imageRepo, config, firedTriggersForConfig[config.Name]) if err != nil { - glog.V(2).Infof("Error regenerating deploymentConfig %s/%s: %v", config.Namespace, config.Name, err) + anyFailed = true + continue } + glog.V(4).Infof("Updated deploymentConfig %s in response to image change trigger", labelFor(config)) + } + + if anyFailed { + return fmt.Errorf("couldn't update some deploymentConfigs for trigger on imageRepo %s", labelForRepo(imageRepo)) } + + glog.V(4).Infof("Updated all configs for trigger on imageRepo %s", labelForRepo(imageRepo)) + return nil } // triggerMatchesImages decides whether a given trigger for config matches the provided image repo. @@ -115,10 +125,9 @@ func triggerMatchesImage(config *deployapi.DeploymentConfig, trigger *deployapi. func (c *ImageChangeController) regenerate(imageRepo *imageapi.ImageRepository, config *deployapi.DeploymentConfig, triggers []deployapi.DeploymentTriggerImageChangeParams) error { // Get a regenerated config which includes the new image repo references - newConfig, err := c.DeploymentConfigInterface.GenerateDeploymentConfig(config.Namespace, config.Name) + newConfig, err := c.DeploymentConfigClient.GenerateDeploymentConfig(config.Namespace, config.Name) if err != nil { - glog.V(2).Infof("Error generating new version of deploymentConfig %v", config.Name) - return err + return fmt.Errorf("error generating new version of deploymentConfig %s: %v", labelFor(config), err) } // Update the deployment config with the trigger that resulted in the new config @@ -154,11 +163,40 @@ func (c *ImageChangeController) regenerate(imageRepo *imageapi.ImageRepository, } // Persist the new config - _, err = c.DeploymentConfigInterface.UpdateDeploymentConfig(newConfig.Namespace, newConfig) + _, err = c.DeploymentConfigClient.UpdateDeploymentConfig(newConfig.Namespace, newConfig) if err != nil { - glog.V(2).Infof("Error updating deploymentConfig %v", newConfig.Name) - return err + return fmt.Errorf("couldn't update deploymentConfig %s: %v", labelFor(config), err) } return nil } + +func labelForRepo(imageRepo *imageapi.ImageRepository) string { + return fmt.Sprintf("%s/%s", imageRepo.Namespace, imageRepo.Name) +} + +// ImageChangeControllerDeploymentConfigClient abstracts access to DeploymentConfigs. +type ImageChangeControllerDeploymentConfigClient interface { + ListDeploymentConfigs() ([]*deployapi.DeploymentConfig, error) + UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) + GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) +} + +// ImageChangeControllerDeploymentConfigClientImpl is a pluggable ChangeStrategy. +type ImageChangeControllerDeploymentConfigClientImpl struct { + ListDeploymentConfigsFunc func() ([]*deployapi.DeploymentConfig, error) + GenerateDeploymentConfigFunc func(namespace, name string) (*deployapi.DeploymentConfig, error) + UpdateDeploymentConfigFunc func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) +} + +func (i *ImageChangeControllerDeploymentConfigClientImpl) ListDeploymentConfigs() ([]*deployapi.DeploymentConfig, error) { + return i.ListDeploymentConfigsFunc() +} + +func (i *ImageChangeControllerDeploymentConfigClientImpl) GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) { + return i.GenerateDeploymentConfigFunc(namespace, name) +} + +func (i *ImageChangeControllerDeploymentConfigClientImpl) UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + return i.UpdateDeploymentConfigFunc(namespace, config) +} diff --git a/pkg/deploy/controller/image_change_controller_test.go b/pkg/deploy/controller/image_change_controller_test.go index a4236ca423d6..c753d91338a2 100644 --- a/pkg/deploy/controller/image_change_controller_test.go +++ b/pkg/deploy/controller/image_change_controller_test.go @@ -7,32 +7,16 @@ import ( deployapi "github.com/openshift/origin/pkg/deploy/api" deployapitest "github.com/openshift/origin/pkg/deploy/api/test" - deploytest "github.com/openshift/origin/pkg/deploy/controller/test" imageapi "github.com/openshift/origin/pkg/image/api" ) -type testIcDeploymentConfigInterface struct { - UpdateDeploymentConfigFunc func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) - GenerateDeploymentConfigFunc func(namespace, name string) (*deployapi.DeploymentConfig, error) -} - -func (i *testIcDeploymentConfigInterface) UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { - return i.UpdateDeploymentConfigFunc(namespace, config) -} -func (i *testIcDeploymentConfigInterface) GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) { - return i.GenerateDeploymentConfigFunc(namespace, name) -} - const ( nonDefaultNamespace = "nondefaultnamespace" ) func TestUnregisteredContainer(t *testing.T) { - config := deployapitest.OkDeploymentConfig(1) - config.Triggers[0].ImageChangeParams.ContainerNames = []string{"container-3"} - controller := &ImageChangeController{ - DeploymentConfigInterface: &testIcDeploymentConfigInterface{ + DeploymentConfigClient: &ImageChangeControllerDeploymentConfigClientImpl{ UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { t.Fatalf("unexpected deployment config update") return nil, nil @@ -41,23 +25,26 @@ func TestUnregisteredContainer(t *testing.T) { t.Fatalf("unexpected generator call") return nil, nil }, + ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { + config := deployapitest.OkDeploymentConfig(1) + config.Triggers[0].ImageChangeParams.ContainerNames = []string{"container-3"} + + return []*deployapi.DeploymentConfig{config}, nil + }, }, - NextImageRepository: func() *imageapi.ImageRepository { - return tagUpdate() - }, - DeploymentConfigStore: deploytest.NewFakeDeploymentConfigStore(config), } // verify no-op - controller.HandleImageRepo() + err := controller.HandleImageRepo(tagUpdate()) + + if err != nil { + t.Fatalf("unexpected err: %v", err) + } } func TestImageChangeForNonAutomaticTag(t *testing.T) { - config := deployapitest.OkDeploymentConfig(1) - config.Triggers[0].ImageChangeParams.Automatic = false - controller := &ImageChangeController{ - DeploymentConfigInterface: &testIcDeploymentConfigInterface{ + DeploymentConfigClient: &ImageChangeControllerDeploymentConfigClientImpl{ UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { t.Fatalf("unexpected deployment config update") return nil, nil @@ -66,22 +53,26 @@ func TestImageChangeForNonAutomaticTag(t *testing.T) { t.Fatalf("unexpected generator call") return nil, nil }, + ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { + config := deployapitest.OkDeploymentConfig(1) + config.Triggers[0].ImageChangeParams.Automatic = false + + return []*deployapi.DeploymentConfig{config}, nil + }, }, - NextImageRepository: func() *imageapi.ImageRepository { - return tagUpdate() - }, - DeploymentConfigStore: deploytest.NewFakeDeploymentConfigStore(config), } // verify no-op - controller.HandleImageRepo() + err := controller.HandleImageRepo(tagUpdate()) + + if err != nil { + t.Fatalf("unexpected err: %v", err) + } } func TestImageChangeForUnregisteredTag(t *testing.T) { - config := imageChangeDeploymentConfig() - controller := &ImageChangeController{ - DeploymentConfigInterface: &testIcDeploymentConfigInterface{ + DeploymentConfigClient: &ImageChangeControllerDeploymentConfigClientImpl{ UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { t.Fatalf("unexpected deployment config update") return nil, nil @@ -90,19 +81,21 @@ func TestImageChangeForUnregisteredTag(t *testing.T) { t.Fatalf("unexpected generator call") return nil, nil }, + ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { + return []*deployapi.DeploymentConfig{imageChangeDeploymentConfig()}, nil + }, }, - NextImageRepository: func() *imageapi.ImageRepository { - imageRepo := tagUpdate() - imageRepo.Tags = map[string]string{ - "unknown-tag": "ref-1", - } - return imageRepo - }, - DeploymentConfigStore: deploytest.NewFakeDeploymentConfigStore(config), } // verify no-op - controller.HandleImageRepo() + imageRepo := tagUpdate() + imageRepo.Tags = map[string]string{ + "unknown-tag": "ref-1", + } + err := controller.HandleImageRepo(imageRepo) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } } func TestImageChangeMatchScenarios(t *testing.T) { @@ -187,7 +180,7 @@ func TestImageChangeMatchScenarios(t *testing.T) { generated := false controller := &ImageChangeController{ - DeploymentConfigInterface: &testIcDeploymentConfigInterface{ + DeploymentConfigClient: &ImageChangeControllerDeploymentConfigClientImpl{ UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { if !s.matches { t.Fatalf("unexpected deployment config update for scenario: %v", s) @@ -202,15 +195,18 @@ func TestImageChangeMatchScenarios(t *testing.T) { generated = true return config, nil }, + ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { + return []*deployapi.DeploymentConfig{config}, nil + }, }, - NextImageRepository: func() *imageapi.ImageRepository { - return updates[s.repo] - }, - DeploymentConfigStore: deploytest.NewFakeDeploymentConfigStore(config), } t.Logf("running scenario: %v", s) - controller.HandleImageRepo() + err := controller.HandleImageRepo(updates[s.repo]) + + if err != nil { + t.Fatalf("unexpected error for scenario %v: %v", s, err) + } // assert updates/generations occurred if s.matches && !updated { diff --git a/pkg/deploy/util/util.go b/pkg/deploy/util/util.go index b5f6aa4c6389..e04945937e23 100644 --- a/pkg/deploy/util/util.go +++ b/pkg/deploy/util/util.go @@ -11,6 +11,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" deployapi "github.com/openshift/origin/pkg/deploy/api" ) @@ -20,6 +21,10 @@ func LatestDeploymentNameForConfig(config *deployapi.DeploymentConfig) string { return config.Name + "-" + strconv.Itoa(config.LatestVersion) } +func DeployerPodNameForDeployment(deployment *api.ReplicationController) string { + return fmt.Sprintf("deploy-%s", deployment.Name) +} + // HashPodSpecs hashes a PodSpec into a uint64. // TODO: Resources are currently ignored due to the formats not surviving encoding/decoding // in a consistent manner (e.g. 0 is represented sometimes as 0.000) @@ -142,3 +147,18 @@ func MakeDeployment(config *deployapi.DeploymentConfig, codec runtime.Codec) (*a return deployment, nil } + +// ListWatcherImpl is a pluggable ListWatcher. +// TODO: This has been incorporated upstream; replace during a future rebase. +type ListWatcherImpl struct { + ListFunc func() (runtime.Object, error) + WatchFunc func(resourceVersion string) (watch.Interface, error) +} + +func (lw *ListWatcherImpl) List() (runtime.Object, error) { + return lw.ListFunc() +} + +func (lw *ListWatcherImpl) Watch(resourceVersion string) (watch.Interface, error) { + return lw.WatchFunc(resourceVersion) +}