diff --git a/pkg/cmd/server/origin/master.go b/pkg/cmd/server/origin/master.go index b37dc35fd3fd..71b96329e724 100644 --- a/pkg/cmd/server/origin/master.go +++ b/pkg/cmd/server/origin/master.go @@ -41,8 +41,11 @@ import ( osclient "github.com/openshift/origin/pkg/client" cmdutil "github.com/openshift/origin/pkg/cmd/util" "github.com/openshift/origin/pkg/cmd/util/clientcmd" + configchangecontroller "github.com/openshift/origin/pkg/deploy/controller/configchange" + deployerpodcontroller "github.com/openshift/origin/pkg/deploy/controller/deployerpod" + deploycontroller "github.com/openshift/origin/pkg/deploy/controller/deployment" deployconfigcontroller "github.com/openshift/origin/pkg/deploy/controller/deploymentconfig" - deploycontrollerfactory "github.com/openshift/origin/pkg/deploy/controller/factory" + imagechangecontroller "github.com/openshift/origin/pkg/deploy/controller/imagechange" deployconfiggenerator "github.com/openshift/origin/pkg/deploy/generator" deployregistry "github.com/openshift/origin/pkg/deploy/registry/deploy" deployconfigregistry "github.com/openshift/origin/pkg/deploy/registry/deployconfig" @@ -604,20 +607,30 @@ func (c *MasterConfig) RunBuildImageChangeTriggerController() { // RunDeploymentController starts the deployment controller process. func (c *MasterConfig) RunDeploymentController() { - osclient, kclient := c.DeploymentControllerClients() - factory := deploycontrollerfactory.DeploymentControllerFactory{ - Client: osclient, - KubeClient: kclient, - Codec: latest.Codec, - Environment: []api.EnvVar{ - {Name: "KUBERNETES_MASTER", Value: c.MasterAddr}, - {Name: "OPENSHIFT_MASTER", Value: c.MasterAddr}, - }, + _, kclient := c.DeploymentControllerClients() + env := []api.EnvVar{ + {Name: "KUBERNETES_MASTER", Value: c.MasterAddr}, + {Name: "OPENSHIFT_MASTER", Value: c.MasterAddr}, + } + env = append(env, clientcmd.EnvVarsFromConfig(c.DeployerClientConfig())...) + + factory := deploycontroller.DeploymentControllerFactory{ + KubeClient: kclient, + Codec: latest.Codec, + Environment: env, RecreateStrategyImage: c.ImageFor("deployer"), } - envvars := clientcmd.EnvVarsFromConfig(c.DeployerClientConfig()) - factory.Environment = append(factory.Environment, envvars...) + controller := factory.Create() + controller.Run() +} + +// RunDeployerPodController starts the deployer pod controller process. +func (c *MasterConfig) RunDeployerPodController() { + _, kclient := c.DeploymentControllerClients() + factory := deployerpodcontroller.DeployerPodControllerFactory{ + KubeClient: kclient, + } controller := factory.Create() controller.Run() @@ -636,7 +649,7 @@ func (c *MasterConfig) RunDeploymentConfigController() { func (c *MasterConfig) RunDeploymentConfigChangeController() { osclient, kclient := c.DeploymentConfigChangeControllerClients() - factory := deploycontrollerfactory.DeploymentConfigChangeControllerFactory{ + factory := configchangecontroller.DeploymentConfigChangeControllerFactory{ Client: osclient, KubeClient: kclient, Codec: latest.Codec, @@ -647,7 +660,7 @@ func (c *MasterConfig) RunDeploymentConfigChangeController() { func (c *MasterConfig) RunDeploymentImageChangeTriggerController() { osclient := c.DeploymentImageChangeControllerClient() - factory := deploycontrollerfactory.ImageChangeControllerFactory{Client: osclient} + factory := imagechangecontroller.ImageChangeControllerFactory{Client: osclient} controller := factory.Create() controller.Run() } diff --git a/pkg/cmd/server/start.go b/pkg/cmd/server/start.go index 917ba58a6677..a7b445e1a8c2 100644 --- a/pkg/cmd/server/start.go +++ b/pkg/cmd/server/start.go @@ -98,6 +98,7 @@ func (cfg Config) startMaster() error { openshiftConfig.RunBuildController() openshiftConfig.RunBuildImageChangeTriggerController() openshiftConfig.RunDeploymentController() + openshiftConfig.RunDeployerPodController() openshiftConfig.RunDeploymentConfigController() openshiftConfig.RunDeploymentConfigChangeController() openshiftConfig.RunDeploymentImageChangeTriggerController() diff --git a/pkg/deploy/controller/config_change_controller.go b/pkg/deploy/controller/config_change_controller.go deleted file mode 100644 index d490efe162a2..000000000000 --- a/pkg/deploy/controller/config_change_controller.go +++ /dev/null @@ -1,148 +0,0 @@ -package controller - -import ( - "fmt" - - "github.com/golang/glog" - - kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - runtime "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - util "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - - deployapi "github.com/openshift/origin/pkg/deploy/api" - deployutil "github.com/openshift/origin/pkg/deploy/util" -) - -// DeploymentConfigChangeController watches for changes to DeploymentConfigs and regenerates them only -// when detecting a change to the PodTemplate of a DeploymentConfig containing a ConfigChange trigger. -type DeploymentConfigChangeController struct { - ChangeStrategy ChangeStrategy - NextDeploymentConfig func() *deployapi.DeploymentConfig - Codec runtime.Codec - // Stop is an optional channel that controls when the controller exits - Stop <-chan struct{} -} - -// Run watches for config change events. -func (dc *DeploymentConfigChangeController) Run() { - go util.Until(func() { - err := dc.HandleDeploymentConfig(dc.NextDeploymentConfig()) - if err != nil { - glog.Errorf("%v", err) - } - }, 0, dc.Stop) -} - -// HandleDeploymentConfig handles the next DeploymentConfig change that happens. -func (dc *DeploymentConfigChangeController) HandleDeploymentConfig(config *deployapi.DeploymentConfig) error { - hasChangeTrigger := false - for _, trigger := range config.Triggers { - if trigger.Type == deployapi.DeploymentTriggerOnConfigChange { - hasChangeTrigger = true - break - } - } - - if !hasChangeTrigger { - glog.V(4).Infof("Ignoring config %s; no change triggers detected", labelFor(config)) - return nil - } - - if config.LatestVersion == 0 { - _, _, err := dc.generateDeployment(config) - if err != nil { - return fmt.Errorf("couldn't create initial deployment for config %s: %v", labelFor(config), err) - } - glog.V(4).Infof("Created initial deployment for config %s", labelFor(config)) - return nil - } - - latestDeploymentName := deployutil.LatestDeploymentNameForConfig(config) - deployment, err := dc.ChangeStrategy.GetDeployment(config.Namespace, latestDeploymentName) - if err != nil { - if kerrors.IsNotFound(err) { - glog.V(4).Infof("Ignoring config change for %s; no existing deployment found", labelFor(config)) - return nil - } - return fmt.Errorf("couldn't retrieve deployment for %s: %v", labelFor(config), err) - } - - deployedConfig, err := deployutil.DecodeDeploymentConfig(deployment, dc.Codec) - if err != nil { - return fmt.Errorf("error decoding deploymentConfig from deployment %s for config %s: %v", labelForDeployment(deployment), labelFor(config), err) - } - - if deployutil.PodSpecsEqual(config.Template.ControllerTemplate.Template.Spec, deployedConfig.Template.ControllerTemplate.Template.Spec) { - glog.V(4).Infof("Ignoring config change for %s (latestVersion=%d); same as deployment %s", labelFor(config), config.LatestVersion, labelForDeployment(deployment)) - return nil - } - - fromVersion, toVersion, err := dc.generateDeployment(config) - if err != nil { - return fmt.Errorf("couldn't generate deployment for config %s: %v", labelFor(config), err) - } - glog.V(4).Infof("Updated config %s from version %d to %d for existing deployment %s", labelFor(config), fromVersion, toVersion, labelForDeployment(deployment)) - return nil -} - -func (dc *DeploymentConfigChangeController) generateDeployment(config *deployapi.DeploymentConfig) (int, int, error) { - newConfig, err := dc.ChangeStrategy.GenerateDeploymentConfig(config.Namespace, config.Name) - if err != nil { - return config.LatestVersion, 0, fmt.Errorf("Error generating new version of deploymentConfig %s: %v", labelFor(config), err) - } - - if newConfig.LatestVersion == config.LatestVersion { - newConfig.LatestVersion++ - } - - // set the trigger details for the new deployment config - causes := []*deployapi.DeploymentCause{} - causes = append(causes, - &deployapi.DeploymentCause{ - Type: deployapi.DeploymentTriggerOnConfigChange, - }) - newConfig.Details = &deployapi.DeploymentDetails{ - Causes: causes, - } - - // This update is atomic. If it fails because a newer resource was already persisted, that's - // okay - we can just ignore the update for the old resource and any changes to the more - // current config will be captured in future events. - if _, err = dc.ChangeStrategy.UpdateDeploymentConfig(config.Namespace, newConfig); err != nil { - return config.LatestVersion, newConfig.LatestVersion, fmt.Errorf("couldn't update deploymentConfig %s: %v", labelFor(config), err) - } - - return config.LatestVersion, newConfig.LatestVersion, nil -} - -// ChangeStrategy knows how to generate and update DeploymentConfigs. -type ChangeStrategy interface { - GetDeployment(namespace, name string) (*kapi.ReplicationController, error) - GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) - UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) -} - -// ChangeStrategyImpl is a pluggable ChangeStrategy. -type ChangeStrategyImpl struct { - GetDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) - GenerateDeploymentConfigFunc func(namespace, name string) (*deployapi.DeploymentConfig, error) - UpdateDeploymentConfigFunc func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) -} - -func (i *ChangeStrategyImpl) GetDeployment(namespace, name string) (*kapi.ReplicationController, error) { - return i.GetDeploymentFunc(namespace, name) -} - -func (i *ChangeStrategyImpl) GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) { - return i.GenerateDeploymentConfigFunc(namespace, name) -} - -func (i *ChangeStrategyImpl) UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { - return i.UpdateDeploymentConfigFunc(namespace, config) -} - -// labelFor builds a string identifier for a DeploymentConfig. -func labelFor(config *deployapi.DeploymentConfig) string { - return fmt.Sprintf("%s/%s:%d", config.Namespace, config.Name, config.LatestVersion) -} diff --git a/pkg/deploy/controller/configchange/controller.go b/pkg/deploy/controller/configchange/controller.go new file mode 100644 index 000000000000..c5a447783440 --- /dev/null +++ b/pkg/deploy/controller/configchange/controller.go @@ -0,0 +1,155 @@ +package configchange + +import ( + "fmt" + + "github.com/golang/glog" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + + deployapi "github.com/openshift/origin/pkg/deploy/api" + deployutil "github.com/openshift/origin/pkg/deploy/util" +) + +// DeploymentConfigChangeController increments the version of a +// DeploymentConfig which has a config change trigger when a pod template +// change is detected. +// +// Use the DeploymentConfigChangeControllerFactory to create this controller. +type DeploymentConfigChangeController struct { + // changeStrategy knows how to generate and update DeploymentConfigs. + changeStrategy changeStrategy + // decodeConfig knows how to decode the deploymentConfig from a deployment's annotations. + decodeConfig func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) +} + +// fatalError is an error which can't be retried. +type fatalError string + +func (e fatalError) Error() string { return "fatal error handling config: " + string(e) } + +// Handle processes change triggers for config. +func (c *DeploymentConfigChangeController) Handle(config *deployapi.DeploymentConfig) error { + hasChangeTrigger := false + for _, trigger := range config.Triggers { + if trigger.Type == deployapi.DeploymentTriggerOnConfigChange { + hasChangeTrigger = true + break + } + } + + if !hasChangeTrigger { + glog.V(4).Infof("Ignoring config %s; no change triggers detected", labelFor(config)) + return nil + } + + if config.LatestVersion == 0 { + _, _, err := c.generateDeployment(config) + if err != nil { + if kerrors.IsConflict(err) { + return fatalError(fmt.Sprintf("config %s updated since retrieval; aborting trigger", labelFor(config), err)) + } + return fmt.Errorf("couldn't create initial deployment for config %s: %v", labelFor(config), err) + } + glog.V(4).Infof("Created initial deployment for config %s", labelFor(config)) + return nil + } + + latestDeploymentName := deployutil.LatestDeploymentNameForConfig(config) + deployment, err := c.changeStrategy.getDeployment(config.Namespace, latestDeploymentName) + if err != nil { + if kerrors.IsNotFound(err) { + glog.V(4).Infof("Ignoring config change for %s; no existing deployment found", labelFor(config)) + return nil + } + return fmt.Errorf("couldn't retrieve deployment for %s: %v", labelFor(config), err) + } + + deployedConfig, err := c.decodeConfig(deployment) + if err != nil { + return fatalError(fmt.Sprintf("error decoding deploymentConfig from deployment %s for config %s: %v", labelForDeployment(deployment), labelFor(config), err)) + } + + if deployutil.PodSpecsEqual(config.Template.ControllerTemplate.Template.Spec, deployedConfig.Template.ControllerTemplate.Template.Spec) { + glog.V(4).Infof("Ignoring config change for %s (latestVersion=%d); same as deployment %s", labelFor(config), config.LatestVersion, labelForDeployment(deployment)) + return nil + } + + fromVersion, toVersion, err := c.generateDeployment(config) + if err != nil { + if kerrors.IsConflict(err) { + return fatalError(fmt.Sprintf("config %s updated since retrieval; aborting trigger: %v", labelFor(config), err)) + } + return fmt.Errorf("couldn't generate deployment for config %s: %v", labelFor(config), err) + } + glog.V(4).Infof("Updated config %s from version %d to %d for existing deployment %s", labelFor(config), fromVersion, toVersion, labelForDeployment(deployment)) + return nil +} + +func (c *DeploymentConfigChangeController) generateDeployment(config *deployapi.DeploymentConfig) (int, int, error) { + newConfig, err := c.changeStrategy.generateDeploymentConfig(config.Namespace, config.Name) + if err != nil { + return config.LatestVersion, 0, err + } + + if newConfig.LatestVersion == config.LatestVersion { + newConfig.LatestVersion++ + } + + // set the trigger details for the new deployment config + causes := []*deployapi.DeploymentCause{} + causes = append(causes, + &deployapi.DeploymentCause{ + Type: deployapi.DeploymentTriggerOnConfigChange, + }) + newConfig.Details = &deployapi.DeploymentDetails{ + Causes: causes, + } + + // This update is atomic. If it fails because a newer resource was already persisted, that's + // okay - we can just ignore the update for the old resource and any changes to the more + // current config will be captured in future events. + updatedConfig, err := c.changeStrategy.updateDeploymentConfig(config.Namespace, newConfig) + if err != nil { + return config.LatestVersion, newConfig.LatestVersion, err + } + + return config.LatestVersion, updatedConfig.LatestVersion, nil +} + +// changeStrategy knows how to generate and update DeploymentConfigs. +type changeStrategy interface { + getDeployment(namespace, name string) (*kapi.ReplicationController, error) + generateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) + updateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) +} + +// changeStrategyImpl is a pluggable changeStrategy. +type changeStrategyImpl struct { + getDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) + generateDeploymentConfigFunc func(namespace, name string) (*deployapi.DeploymentConfig, error) + updateDeploymentConfigFunc func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) +} + +func (i *changeStrategyImpl) getDeployment(namespace, name string) (*kapi.ReplicationController, error) { + return i.getDeploymentFunc(namespace, name) +} + +func (i *changeStrategyImpl) generateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) { + return i.generateDeploymentConfigFunc(namespace, name) +} + +func (i *changeStrategyImpl) updateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + return i.updateDeploymentConfigFunc(namespace, config) +} + +// labelFor builds a string identifier for a DeploymentConfig. +func labelFor(config *deployapi.DeploymentConfig) string { + return fmt.Sprintf("%s/%s:%d", config.Namespace, config.Name, config.LatestVersion) +} + +// labelForDeployment builds a string identifier for a DeploymentConfig. +func labelForDeployment(deployment *kapi.ReplicationController) string { + return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) +} diff --git a/pkg/deploy/controller/config_change_controller_test.go b/pkg/deploy/controller/configchange/controller_test.go similarity index 61% rename from pkg/deploy/controller/config_change_controller_test.go rename to pkg/deploy/controller/configchange/controller_test.go index ca9edd4582bd..b2bc4e518136 100644 --- a/pkg/deploy/controller/config_change_controller_test.go +++ b/pkg/deploy/controller/configchange/controller_test.go @@ -1,4 +1,4 @@ -package controller +package configchange import ( "testing" @@ -11,16 +11,19 @@ import ( deployutil "github.com/openshift/origin/pkg/deploy/util" ) -// Test the controller's response to a new DeploymentConfig with a config change trigger. -func TestNewConfigWithoutTrigger(t *testing.T) { +// TestHandle_newConfigNoTriggers ensures that a change to a config with no +// triggers doesn't result in a new config version bump. +func TestHandle_newConfigNoTriggers(t *testing.T) { controller := &DeploymentConfigChangeController{ - Codec: api.Codec, - ChangeStrategy: &ChangeStrategyImpl{ - GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + changeStrategy: &changeStrategyImpl{ + generateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { t.Fatalf("unexpected generation of deploymentConfig") return nil, nil }, - UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + updateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { t.Fatalf("unexpected update of deploymentConfig") return config, nil }, @@ -29,23 +32,28 @@ func TestNewConfigWithoutTrigger(t *testing.T) { config := deployapitest.OkDeploymentConfig(1) config.Triggers = []deployapi.DeploymentTriggerPolicy{} - err := controller.HandleDeploymentConfig(config) + err := controller.Handle(config) if err != nil { t.Fatalf("unexpected error: %v", err) } } -func TestNewConfigWithTrigger(t *testing.T) { +// TestHandle_newConfigTriggers ensures that the creation of a new config +// (with version 0) with a config change trigger results in a version bump and +// cause update for initial deployment. +func TestHandle_newConfigTriggers(t *testing.T) { var updated *deployapi.DeploymentConfig controller := &DeploymentConfigChangeController{ - Codec: api.Codec, - ChangeStrategy: &ChangeStrategyImpl{ - GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + changeStrategy: &changeStrategyImpl{ + generateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { return deployapitest.OkDeploymentConfig(1), nil }, - UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + updateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { updated = config return config, nil }, @@ -54,7 +62,7 @@ func TestNewConfigWithTrigger(t *testing.T) { config := deployapitest.OkDeploymentConfig(0) config.Triggers = []deployapi.DeploymentTriggerPolicy{deployapitest.OkConfigChangeTrigger()} - err := controller.HandleDeploymentConfig(config) + err := controller.Handle(config) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -77,21 +85,25 @@ func TestNewConfigWithTrigger(t *testing.T) { } } -// Test the controller's response when the pod template is changed -func TestChangeWithTemplateDiff(t *testing.T) { +// TestHandle_changeWithTemplateDiff ensures that a pod template change to a +// config with a config change trigger results in a version bump and cause +// update. +func TestHandle_changeWithTemplateDiff(t *testing.T) { var updated *deployapi.DeploymentConfig controller := &DeploymentConfigChangeController{ - Codec: api.Codec, - ChangeStrategy: &ChangeStrategyImpl{ - GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + changeStrategy: &changeStrategyImpl{ + generateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { return deployapitest.OkDeploymentConfig(2), nil }, - UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + updateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { updated = config return config, nil }, - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { deployment, _ := deployutil.MakeDeployment(deployapitest.OkDeploymentConfig(1), kapi.Codec) return deployment, nil }, @@ -101,7 +113,7 @@ func TestChangeWithTemplateDiff(t *testing.T) { config := deployapitest.OkDeploymentConfig(1) config.Triggers = []deployapi.DeploymentTriggerPolicy{deployapitest.OkConfigChangeTrigger()} config.Template.ControllerTemplate.Template.Spec.Containers[1].Name = "modified" - err := controller.HandleDeploymentConfig(config) + err := controller.Handle(config) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -124,7 +136,9 @@ func TestChangeWithTemplateDiff(t *testing.T) { } } -func TestChangeWithoutTemplateDiff(t *testing.T) { +// TestHandle_changeWithoutTemplateDiff ensures that an updated config with no +// pod template diff results in the config version remaining the same. +func TestHandle_changeWithoutTemplateDiff(t *testing.T) { config := deployapitest.OkDeploymentConfig(1) config.Triggers = []deployapi.DeploymentTriggerPolicy{deployapitest.OkConfigChangeTrigger()} @@ -132,24 +146,26 @@ func TestChangeWithoutTemplateDiff(t *testing.T) { updated := false controller := &DeploymentConfigChangeController{ - Codec: api.Codec, - ChangeStrategy: &ChangeStrategyImpl{ - GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + changeStrategy: &changeStrategyImpl{ + generateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { generated = true return config, nil }, - UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + updateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { updated = true return config, nil }, - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { deployment, _ := deployutil.MakeDeployment(deployapitest.OkDeploymentConfig(1), kapi.Codec) return deployment, nil }, }, } - err := controller.HandleDeploymentConfig(config) + err := controller.Handle(config) if err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/pkg/deploy/controller/configchange/factory.go b/pkg/deploy/controller/configchange/factory.go new file mode 100644 index 000000000000..efa41ce30c4d --- /dev/null +++ b/pkg/deploy/controller/configchange/factory.go @@ -0,0 +1,74 @@ +package configchange + +import ( + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + osclient "github.com/openshift/origin/pkg/client" + controller "github.com/openshift/origin/pkg/controller" + deployapi "github.com/openshift/origin/pkg/deploy/api" + deployutil "github.com/openshift/origin/pkg/deploy/util" +) + +// DeploymentConfigChangeControllerFactory can create a +// DeploymentConfigChangeController that watches all DeploymentConfigs. +type DeploymentConfigChangeControllerFactory struct { + // Client is an OpenShift client. + Client osclient.Interface + // KubeClient is a Kubernetes client. + KubeClient kclient.Interface + // Codec is used for encoding/decoding. + Codec runtime.Codec +} + +// Create creates a DeploymentConfigChangeController. +func (factory *DeploymentConfigChangeControllerFactory) Create() controller.RunnableController { + deploymentConfigLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } + queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue).Run() + + changeController := &DeploymentConfigChangeController{ + changeStrategy: &changeStrategyImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Get(name) + }, + generateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + return factory.Client.DeploymentConfigs(namespace).Generate(name) + }, + updateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + return factory.Client.DeploymentConfigs(namespace).Update(config) + }, + }, + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, factory.Codec) + }, + } + + return &controller.RetryController{ + Queue: queue, + RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, 1), + ShouldRetry: func(obj interface{}, err error) bool { + if _, isFatal := err.(fatalError); isFatal { + kutil.HandleError(err) + return false + } + return true + }, + Handle: func(obj interface{}) error { + config := obj.(*deployapi.DeploymentConfig) + return changeController.Handle(config) + }, + } +} diff --git a/pkg/deploy/controller/deployerpod/controller.go b/pkg/deploy/controller/deployerpod/controller.go new file mode 100644 index 000000000000..a9cf647c0b1a --- /dev/null +++ b/pkg/deploy/controller/deployerpod/controller.go @@ -0,0 +1,91 @@ +package deployerpod + +import ( + "fmt" + + "github.com/golang/glog" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + + deployapi "github.com/openshift/origin/pkg/deploy/api" +) + +// DeployerPodController keeps a deployment's status in sync with the deployer pod +// handling the deployment. +// +// Use the DeployerPodControllerFactory to create this controller. +type DeployerPodController struct { + // deploymentClient provides access to deployments. + deploymentClient deploymentClient +} + +// Handle syncs pod's status with any associated deployment. +func (c *DeployerPodController) Handle(pod *kapi.Pod) error { + // Verify the assumption that we'll be given only pods correlated to a deployment + deploymentName, hasDeploymentName := pod.Annotations[deployapi.DeploymentAnnotation] + if !hasDeploymentName { + glog.V(2).Infof("Ignoring pod %s; no deployment annotation found", pod.Name) + return nil + } + + deployment, deploymentErr := c.deploymentClient.getDeployment(pod.Namespace, deploymentName) + if deploymentErr != nil { + return fmt.Errorf("couldn't get deployment %s/%s associated with pod %s", pod.Namespace, deploymentName, pod.Name) + } + + currentStatus := statusFor(deployment) + nextStatus := currentStatus + + switch pod.Status.Phase { + case kapi.PodRunning: + nextStatus = deployapi.DeploymentStatusRunning + case kapi.PodSucceeded, kapi.PodFailed: + nextStatus = deployapi.DeploymentStatusComplete + // Detect failure based on the container state + for _, info := range pod.Status.Info { + if info.State.Termination != nil && info.State.Termination.ExitCode != 0 { + nextStatus = deployapi.DeploymentStatusFailed + } + } + } + + if currentStatus != nextStatus { + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(nextStatus) + if _, err := c.deploymentClient.updateDeployment(deployment.Namespace, deployment); err != nil { + return fmt.Errorf("couldn't update deployment %s to status %s: %v", labelForDeployment(deployment), nextStatus, err) + } + glog.V(2).Infof("Updated deployment %s status from %s to %s", labelForDeployment(deployment), currentStatus, nextStatus) + } + + return nil +} + +// labelFor builds a string identifier for a DeploymentConfig. +func labelForDeployment(deployment *kapi.ReplicationController) string { + return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) +} + +// statusFor gets the DeploymentStatus for deployment from its annotations. +func statusFor(deployment *kapi.ReplicationController) deployapi.DeploymentStatus { + return deployapi.DeploymentStatus(deployment.Annotations[deployapi.DeploymentStatusAnnotation]) +} + +// deploymentClient abstracts access to deployments. +type deploymentClient interface { + getDeployment(namespace, name string) (*kapi.ReplicationController, error) + updateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} + +// deploymentClientImpl is a pluggable deploymentControllerDeploymentClient. +type deploymentClientImpl struct { + getDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) + updateDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} + +func (i *deploymentClientImpl) getDeployment(namespace, name string) (*kapi.ReplicationController, error) { + return i.getDeploymentFunc(namespace, name) +} + +func (i *deploymentClientImpl) updateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return i.updateDeploymentFunc(namespace, deployment) +} diff --git a/pkg/deploy/controller/deployerpod/controller_test.go b/pkg/deploy/controller/deployerpod/controller_test.go new file mode 100644 index 000000000000..63e5dca2012c --- /dev/null +++ b/pkg/deploy/controller/deployerpod/controller_test.go @@ -0,0 +1,202 @@ +package deployerpod + +import ( + "testing" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + + deployapi "github.com/openshift/origin/pkg/deploy/api" + deploytest "github.com/openshift/origin/pkg/deploy/api/test" + deployutil "github.com/openshift/origin/pkg/deploy/util" +) + +// TestHandle_uncorrelatedPod ensures that pods uncorrelated with a deployment +// are ignored. +func TestHandle_uncorrelatedPod(t *testing.T) { + controller := &DeployerPodController{ + deploymentClient: &deploymentClientImpl{ + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected deployment update") + return nil, nil + }, + }, + } + + // Verify no-op + pod := runningPod() + pod.Annotations = make(map[string]string) + err := controller.Handle(pod) + + if err != nil { + t.Fatalf("unexpected err: %v", err) + } +} + +// TestHandle_orphanedPod ensures that deployer pods associated with a non- +// existent deployment result in an error. +func TestHandle_orphanedPod(t *testing.T) { + controller := &DeployerPodController{ + deploymentClient: &deploymentClientImpl{ + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("Unexpected deployment update") + return nil, nil + }, + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return nil, kerrors.NewNotFound("ReplicationController", name) + }, + }, + } + + err := controller.Handle(runningPod()) + + if err == nil { + t.Fatalf("expected an error") + } +} + +// TestHandle_runningPod ensures that a running deployer pod results in a +// transition of the deployment's status to running. +func TestHandle_runningPod(t *testing.T) { + var updatedDeployment *kapi.ReplicationController + + controller := &DeployerPodController{ + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusPending) + return deployment, nil + }, + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + updatedDeployment = deployment + return deployment, nil + }, + }, + } + + err := controller.Handle(runningPod()) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if updatedDeployment == nil { + t.Fatalf("expected deployment update") + } + + if e, a := deployapi.DeploymentStatusRunning, statusFor(updatedDeployment); e != a { + t.Fatalf("expected updated deployment status %s, got %s", e, a) + } +} + +// TestHandle_podTerminatedOk ensures that a successfully completed deployer +// pod results in a transition of the deployment's status to complete. +func TestHandle_podTerminatedOk(t *testing.T) { + var updatedDeployment *kapi.ReplicationController + + controller := &DeployerPodController{ + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusRunning) + return deployment, nil + }, + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + updatedDeployment = deployment + return deployment, nil + }, + }, + } + + err := controller.Handle(succeededPod()) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if updatedDeployment == nil { + t.Fatalf("expected deployment update") + } + + if e, a := deployapi.DeploymentStatusComplete, statusFor(updatedDeployment); e != a { + t.Fatalf("expected updated deployment status %s, got %s", e, a) + } +} + +// TestHandle_podTerminatedFail ensures that a failed deployer pod results in +// a transition of the deployment's status to failed. +func TestHandle_podTerminatedFail(t *testing.T) { + var updatedDeployment *kapi.ReplicationController + + controller := &DeployerPodController{ + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusRunning) + return deployment, nil + }, + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + updatedDeployment = deployment + return deployment, nil + }, + }, + } + + err := controller.Handle(failedPod()) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if updatedDeployment == nil { + t.Fatalf("expected deployment update") + } + + if e, a := deployapi.DeploymentStatusFailed, statusFor(updatedDeployment); e != a { + t.Fatalf("expected updated deployment status %s, got %s", e, a) + } +} + +func okPod() *kapi.Pod { + return &kapi.Pod{ + ObjectMeta: kapi.ObjectMeta{ + Name: "deploy-deploy1", + Annotations: map[string]string{ + deployapi.DeploymentAnnotation: "1234", + }, + }, + Status: kapi.PodStatus{ + Info: kapi.PodInfo{ + "container1": kapi.ContainerStatus{}, + }, + }, + } +} + +func succeededPod() *kapi.Pod { + p := okPod() + p.Status.Phase = kapi.PodSucceeded + return p +} + +func failedPod() *kapi.Pod { + p := okPod() + p.Status.Phase = kapi.PodFailed + p.Status.Info["container1"] = kapi.ContainerStatus{ + State: kapi.ContainerState{ + Termination: &kapi.ContainerStateTerminated{ + ExitCode: 1, + }, + }, + } + return p +} + +func runningPod() *kapi.Pod { + p := okPod() + p.Status.Phase = kapi.PodRunning + return p +} diff --git a/pkg/deploy/controller/deployerpod/factory.go b/pkg/deploy/controller/deployerpod/factory.go new file mode 100644 index 000000000000..43a993ad2355 --- /dev/null +++ b/pkg/deploy/controller/deployerpod/factory.go @@ -0,0 +1,128 @@ +package deployerpod + +import ( + "time" + + "github.com/golang/glog" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + controller "github.com/openshift/origin/pkg/controller" + deployapi "github.com/openshift/origin/pkg/deploy/api" + deployutil "github.com/openshift/origin/pkg/deploy/util" +) + +// DeployerPodControllerFactory can create a DeployerPodController which gets +// pods from a queue populated from a watch of all pods filtered by a cache of +// deployments associated with pods. +type DeployerPodControllerFactory struct { + // KubeClient is a Kubernetes client. + KubeClient kclient.Interface +} + +// Create creates a DeployerPodController. +func (factory *DeployerPodControllerFactory) Create() controller.RunnableController { + deploymentLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } + deploymentQueue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + cache.NewReflector(deploymentLW, &kapi.ReplicationController{}, deploymentQueue).Run() + + deploymentStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + cache.NewReflector(deploymentLW, &kapi.ReplicationController{}, deploymentStore).Run() + + // Kubernetes does not currently synchronize Pod status in storage with a Pod's container + // states. Because of this, we can't receive events related to container (and thus Pod) + // state changes, such as Running -> Terminated. As a workaround, populate the FIFO with + // a polling implementation which relies on client calls to list Pods - the Get/List + // REST implementations will populate the synchronized container/pod status on-demand. + // + // TODO: Find a way to get watch events for Pod/container status updates. The polling + // strategy is horribly inefficient and should be addressed upstream somehow. + podQueue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + pollFunc := func() (cache.Enumerator, error) { + return pollPods(deploymentStore, factory.KubeClient) + } + cache.NewPoller(pollFunc, 10*time.Second, podQueue).Run() + + podController := &DeployerPodController{ + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Get(name) + }, + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Update(deployment) + }, + }, + } + + return &controller.RetryController{ + Queue: podQueue, + RetryManager: controller.NewQueueRetryManager(podQueue, cache.MetaNamespaceKeyFunc, 1), + ShouldRetry: func(obj interface{}, err error) bool { + return true + }, + Handle: func(obj interface{}) error { + pod := obj.(*kapi.Pod) + return podController.Handle(pod) + }, + } +} + +// pollPods lists all pods associated with pending or running deployments and returns +// a cache.Enumerator suitable for use with a cache.Poller. +func pollPods(deploymentStore cache.Store, kClient kclient.PodsNamespacer) (cache.Enumerator, error) { + list := &kapi.PodList{} + + for _, obj := range deploymentStore.List() { + deployment := obj.(*kapi.ReplicationController) + + switch deployapi.DeploymentStatus(deployment.Annotations[deployapi.DeploymentStatusAnnotation]) { + case deployapi.DeploymentStatusPending, deployapi.DeploymentStatusRunning: + // Validate the correlating pod annotation + podID, hasPodID := deployment.Annotations[deployapi.DeploymentPodAnnotation] + if !hasPodID { + glog.V(2).Infof("Unexpected state: deployment %s has no pod annotation; skipping pod polling", deployment.Name) + continue + } + + pod, err := kClient.Pods(deployment.Namespace).Get(podID) + if err != nil { + glog.V(2).Infof("Couldn't find pod %s for deployment %s: %#v", podID, deployment.Name, err) + continue + } + + list.Items = append(list.Items, *pod) + } + } + + return &podEnumerator{list}, nil +} + +// podEnumerator allows a cache.Poller to enumerate items in an api.PodList +type podEnumerator struct { + *kapi.PodList +} + +// Len returns the number of items in the pod list. +func (pe *podEnumerator) Len() int { + if pe.PodList == nil { + return 0 + } + return len(pe.Items) +} + +// Get returns the item (and ID) with the particular index. +func (pe *podEnumerator) Get(index int) interface{} { + return &pe.Items[index] +} diff --git a/pkg/deploy/controller/deployment/controller.go b/pkg/deploy/controller/deployment/controller.go new file mode 100644 index 000000000000..72959f4a2129 --- /dev/null +++ b/pkg/deploy/controller/deployment/controller.go @@ -0,0 +1,194 @@ +package deployment + +import ( + "fmt" + + "github.com/golang/glog" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + + deployapi "github.com/openshift/origin/pkg/deploy/api" + deployutil "github.com/openshift/origin/pkg/deploy/util" +) + +// DeploymentController starts a deployment by creating a deployer pod which +// implements a deployment strategy. The status of the deployment will follow +// the status of the deployer pod. The deployer pod is correlated to the +// deployment with annotations. +// +// When the deployment enters a terminal status: +// +// 1. If the deployment finished normally, the deployer pod is deleted. +// 2. If the deployment failed, the deployer pod is not deleted. +// +// Use the DeploymentControllerFactory to create this controller. +type DeploymentController struct { + // deploymentClient provides access to deployments. + deploymentClient deploymentClient + // podClient provides access to pods. + podClient podClient + // makeContainer knows how to make a container appropriate to execute a deployment strategy. + makeContainer func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) + // decodeConfig knows how to decode the deploymentConfig from a deployment's annotations. + decodeConfig func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) +} + +// fatalError is an error which can't be retried. +type fatalError string + +func (e fatalError) Error() string { return "fatal error handling deployment: " + string(e) } + +// Handle processes deployment and either creates a deployer pod or responds +// to a terminal deployment status. +func (c *DeploymentController) Handle(deployment *kapi.ReplicationController) error { + currentStatus := statusFor(deployment) + nextStatus := currentStatus + + switch currentStatus { + case deployapi.DeploymentStatusNew: + podTemplate, err := c.makeDeployerPod(deployment) + if err != nil { + return fatalError(fmt.Sprintf("couldn't make deployer pod for %s: %v", labelForDeployment(deployment), err)) + } + + deploymentPod, err := c.podClient.createPod(deployment.Namespace, podTemplate) + if err != nil { + // If the pod already exists, it's possible that a previous CreatePod succeeded but + // the deployment state update failed and now we're re-entering. + if !kerrors.IsAlreadyExists(err) { + return fmt.Errorf("couldn't create deployer pod for %s: %v", labelForDeployment(deployment), err) + } + } else { + glog.V(2).Infof("Created pod %s for deployment %s", deploymentPod.Name, labelForDeployment(deployment)) + } + + deployment.Annotations[deployapi.DeploymentPodAnnotation] = deploymentPod.Name + nextStatus = deployapi.DeploymentStatusPending + case deployapi.DeploymentStatusPending, + deployapi.DeploymentStatusRunning, + deployapi.DeploymentStatusFailed: + glog.V(4).Infof("Ignoring deployment %s (status %s)", labelForDeployment(deployment), currentStatus) + case deployapi.DeploymentStatusComplete: + // Automatically clean up successful pods + // TODO: Could probably do a lookup here to skip the delete call, but it's not worth adding + // yet since (delete retries will only normally occur during full a re-sync). + podName := deployment.Annotations[deployapi.DeploymentPodAnnotation] + if err := c.podClient.deletePod(deployment.Namespace, podName); err != nil { + if !kerrors.IsNotFound(err) { + return fmt.Errorf("couldn't delete completed deployer pod %s/%s for deployment %s: %v", deployment.Namespace, podName, labelForDeployment(deployment), err) + } + // Already deleted + } else { + glog.V(4).Infof("Deleted completed deployer pod %s/%s for deployment %s", deployment.Namespace, podName, labelForDeployment(deployment)) + } + } + + if currentStatus != nextStatus { + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(nextStatus) + if _, err := c.deploymentClient.updateDeployment(deployment.Namespace, deployment); err != nil { + return fmt.Errorf("couldn't update deployment %s to status %s: %v", labelForDeployment(deployment), nextStatus, err) + } + glog.V(2).Infof("Updated deployment %s status from %s to %s", labelForDeployment(deployment), currentStatus, nextStatus) + } + + return nil +} + +// makeDeployerPod creates a pod which implements deployment behavior. The pod is correlated to +// the deployment with an annotation. +func (c *DeploymentController) makeDeployerPod(deployment *kapi.ReplicationController) (*kapi.Pod, error) { + deploymentConfig, err := c.decodeConfig(deployment) + if err != nil { + return nil, err + } + + container, err := c.makeContainer(&deploymentConfig.Template.Strategy) + if err != nil { + return nil, err + } + + // Add deployment environment variables to the container. + envVars := []kapi.EnvVar{} + for _, env := range container.Env { + envVars = append(envVars, env) + } + envVars = append(envVars, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAME", Value: deployment.Name}) + envVars = append(envVars, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAMESPACE", Value: deployment.Namespace}) + + pod := &kapi.Pod{ + ObjectMeta: kapi.ObjectMeta{ + GenerateName: deployutil.DeployerPodNameForDeployment(deployment), + Annotations: map[string]string{ + deployapi.DeploymentAnnotation: deployment.Name, + }, + }, + Spec: kapi.PodSpec{ + Containers: []kapi.Container{ + { + Name: "deployment", + Command: container.Command, + Image: container.Image, + Env: envVars, + }, + }, + RestartPolicy: kapi.RestartPolicy{ + Never: &kapi.RestartPolicyNever{}, + }, + }, + } + + pod.Spec.Containers[0].ImagePullPolicy = kapi.PullIfNotPresent + + return pod, nil +} + +// labelForDeployment builds a string identifier for a DeploymentConfig. +func labelForDeployment(deployment *kapi.ReplicationController) string { + return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) +} + +// statusFor gets the DeploymentStatus for deployment from its annotations. +func statusFor(deployment *kapi.ReplicationController) deployapi.DeploymentStatus { + return deployapi.DeploymentStatus(deployment.Annotations[deployapi.DeploymentStatusAnnotation]) +} + +// deploymentClient abstracts access to deployments. +type deploymentClient interface { + getDeployment(namespace, name string) (*kapi.ReplicationController, error) + updateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} + +// podClient abstracts access to pods. +type podClient interface { + createPod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) + deletePod(namespace, name string) error +} + +// deploymentClientImpl is a pluggable deploymentClient. +type deploymentClientImpl struct { + getDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) + updateDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} + +func (i *deploymentClientImpl) getDeployment(namespace, name string) (*kapi.ReplicationController, error) { + return i.getDeploymentFunc(namespace, name) +} + +func (i *deploymentClientImpl) updateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return i.updateDeploymentFunc(namespace, deployment) +} + +// podClientImpl is a pluggable podClient. +type podClientImpl struct { + createPodFunc func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) + deletePodFunc func(namespace, name string) error +} + +func (i *podClientImpl) createPod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + return i.createPodFunc(namespace, pod) +} + +func (i *podClientImpl) deletePod(namespace, name string) error { + return i.deletePodFunc(namespace, name) +} diff --git a/pkg/deploy/controller/deployment/controller_test.go b/pkg/deploy/controller/deployment/controller_test.go new file mode 100644 index 000000000000..aaee59f4bfb0 --- /dev/null +++ b/pkg/deploy/controller/deployment/controller_test.go @@ -0,0 +1,407 @@ +package deployment + +import ( + "fmt" + "testing" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + + api "github.com/openshift/origin/pkg/api/latest" + deployapi "github.com/openshift/origin/pkg/deploy/api" + deploytest "github.com/openshift/origin/pkg/deploy/api/test" + deployutil "github.com/openshift/origin/pkg/deploy/util" +) + +// TestHandle_createPodOk ensures that a the deployer pod created in response +// to a new deployment is valid. +func TestHandle_createPodOk(t *testing.T) { + var ( + updatedDeployment *kapi.ReplicationController + createdPod *kapi.Pod + expectedContainer = okContainer() + ) + + controller := &DeploymentController{ + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + updatedDeployment = deployment + return updatedDeployment, nil + }, + }, + podClient: &podClientImpl{ + createPodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + createdPod = pod + return pod, nil + }, + }, + makeContainer: func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) { + return expectedContainer, nil + }, + } + + // Verify new -> pending + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusNew) + err := controller.Handle(deployment) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if updatedDeployment == nil { + t.Fatalf("expected an updated deployment") + } + + if e, a := string(deployapi.DeploymentStatusPending), updatedDeployment.Annotations[deployapi.DeploymentStatusAnnotation]; e != a { + t.Fatalf("expected updated deployment status %s, got %s", e, a) + } + + if createdPod == nil { + t.Fatalf("expected a pod to be created") + } + + if _, hasPodAnnotation := updatedDeployment.Annotations[deployapi.DeploymentPodAnnotation]; !hasPodAnnotation { + t.Fatalf("missing deployment pod annotation") + } + + if e, a := createdPod.Name, updatedDeployment.Annotations[deployapi.DeploymentPodAnnotation]; e != a { + t.Fatalf("expected deployment pod annotation %s, got %s", e, a) + } + + if _, hasDeploymentAnnotation := createdPod.Annotations[deployapi.DeploymentAnnotation]; !hasDeploymentAnnotation { + t.Fatalf("missing deployment annotation") + } + + if e, a := updatedDeployment.Name, createdPod.Annotations[deployapi.DeploymentAnnotation]; e != a { + t.Fatalf("expected pod deployment annotation %s, got %s", e, a) + } + + actualContainer := createdPod.Spec.Containers[0] + + if e, a := expectedContainer.Image, actualContainer.Image; e != a { + t.Fatalf("expected container image %s, got %s", expectedContainer.Image, actualContainer.Image) + } + + if e, a := expectedContainer.Command[0], actualContainer.Command[0]; e != a { + t.Fatalf("expected container command %s, got %s", expectedContainer.Command[0], actualContainer.Command[0]) + } + + if e, a := expectedContainer.Env[0].Name, actualContainer.Env[0].Name; e != a { + t.Fatalf("expected container env name %s, got %s", expectedContainer.Env[0].Name, actualContainer.Env[0].Name) + } + + if e, a := expectedContainer.Env[0].Value, actualContainer.Env[0].Value; e != a { + t.Fatalf("expected container env value %s, got %s", expectedContainer.Env[0].Value, actualContainer.Env[0].Value) + } +} + +// TestHandle_makeContainerFail ensures that an internal (not API) failure to +// create a deployer pod results in a fatal error. +func TestHandle_makeContainerFail(t *testing.T) { + var updatedDeployment *kapi.ReplicationController + + controller := &DeploymentController{ + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + updateDeploymentFunc: func(namspace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + updatedDeployment = deployment + return updatedDeployment, nil + }, + }, + podClient: &podClientImpl{ + createPodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + t.Fatalf("unexpected call to create pod") + return nil, nil + }, + }, + makeContainer: func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) { + return nil, fmt.Errorf("couldn't make container") + }, + } + + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusNew) + err := controller.Handle(deployment) + + if err == nil { + t.Fatalf("expected an error") + } + + if _, isFatal := err.(fatalError); !isFatal { + t.Fatalf("expected a fatal error, got %v", err) + } +} + +// TestHandle_createPodFail ensures that an an API failure while creating a +// deployer pod results in a nonfatal error. +func TestHandle_createPodFail(t *testing.T) { + var updatedDeployment *kapi.ReplicationController + + controller := &DeploymentController{ + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + updateDeploymentFunc: func(namspace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + updatedDeployment = deployment + return updatedDeployment, nil + }, + }, + podClient: &podClientImpl{ + createPodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + return nil, fmt.Errorf("Failed to create pod %s", pod.Name) + }, + }, + makeContainer: func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) { + return okContainer(), nil + }, + } + + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusNew) + err := controller.Handle(deployment) + + if err == nil { + t.Fatalf("expected an error") + } + + if _, isFatal := err.(fatalError); isFatal { + t.Fatalf("expected a nonfatal error, got a %#v", err) + } +} + +// TestHandle_createPodAlreadyExists ensures that attempts to create a +// deployer pod which was already created don't result in an error +// (effectively skipping the handling as redundant). +func TestHandle_createPodAlreadyExists(t *testing.T) { + controller := &DeploymentController{ + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected deployment update") + return nil, nil + }, + }, + podClient: &podClientImpl{ + createPodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + return nil, kerrors.NewAlreadyExists("Pod", pod.Name) + }, + }, + makeContainer: func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) { + return okContainer(), nil + }, + } + + // Verify no-op + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusPending) + err := controller.Handle(deployment) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +// TestHandle_noop ensures that pending, running, and failed states result in +// no action by the controller (as these represent in-progress or terminal +// states). +func TestHandle_noop(t *testing.T) { + controller := &DeploymentController{ + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected deployment update") + return nil, nil + }, + }, + podClient: &podClientImpl{ + createPodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + t.Fatalf("unexpected call to create pod") + return nil, nil + }, + }, + makeContainer: func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) { + t.Fatalf("unexpected call to make container") + return nil, nil + }, + } + + // Verify no-op + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + + noopStatus := []deployapi.DeploymentStatus{ + deployapi.DeploymentStatusPending, + deployapi.DeploymentStatusRunning, + deployapi.DeploymentStatusFailed, + } + for _, status := range noopStatus { + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(status) + err := controller.Handle(deployment) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } +} + +// TestHandle_cleanupPodOk ensures that deployer pods are cleaned up for +// deployments in a completed state. +func TestHandle_cleanupPodOk(t *testing.T) { + podName := "pod" + deletedPodName := "" + deletedPodNamespace := "" + + controller := &DeploymentController{ + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected deployment update") + return nil, nil + }, + }, + podClient: &podClientImpl{ + createPodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + t.Fatalf("unexpected call to create pod") + return nil, nil + }, + deletePodFunc: func(namespace, name string) error { + deletedPodNamespace = namespace + deletedPodName = name + return nil + }, + }, + makeContainer: func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) { + t.Fatalf("unexpected call to make container") + return nil, nil + }, + } + + // Verify successful cleanup + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusComplete) + deployment.Annotations[deployapi.DeploymentPodAnnotation] = podName + err := controller.Handle(deployment) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if e, a := deployment.Namespace, deletedPodNamespace; e != a { + t.Fatalf("expected deleted pod namespace %s, got %s", e, a) + } + + if e, a := podName, deletedPodName; e != a { + t.Fatalf("expected deleted pod name %s, got %s", e, a) + } +} + +// TestHandle_cleanupPodNoop ensures that repeated attempts to clean up an +// already-deleted deployer pod for a completed deployment safely do nothing. +func TestHandle_cleanupPodNoop(t *testing.T) { + controller := &DeploymentController{ + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected deployment update") + return nil, nil + }, + }, + podClient: &podClientImpl{ + createPodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + t.Fatalf("unexpected call to create pod") + return nil, nil + }, + deletePodFunc: func(namespace, name string) error { + return kerrors.NewNotFound("Pod", name) + }, + }, + makeContainer: func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) { + t.Fatalf("unexpected call to make container") + return nil, nil + }, + } + + // Verify no-op + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusComplete) + deployment.Annotations[deployapi.DeploymentPodAnnotation] = "pod" + err := controller.Handle(deployment) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +// TestHandle_cleanupPodFail ensures that a failed attempt to clean up the +// deployer pod for a completed deployment results in a nonfatal error. +func TestHandle_cleanupPodFail(t *testing.T) { + controller := &DeploymentController{ + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected deployment update") + return nil, nil + }, + }, + podClient: &podClientImpl{ + createPodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + t.Fatalf("unexpected call to create pod") + return nil, nil + }, + deletePodFunc: func(namespace, name string) error { + return kerrors.NewInternalError(fmt.Errorf("test error")) + }, + }, + makeContainer: func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) { + t.Fatalf("unexpected call to make container") + return nil, nil + }, + } + + // Verify error + config := deploytest.OkDeploymentConfig(1) + deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) + deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusComplete) + deployment.Annotations[deployapi.DeploymentPodAnnotation] = "pod" + err := controller.Handle(deployment) + + if err == nil { + t.Fatalf("expected an error") + } +} + +func okContainer() *kapi.Container { + return &kapi.Container{ + Image: "test/image", + Command: []string{"command"}, + Env: []kapi.EnvVar{ + { + Name: "env1", + Value: "val1", + }, + }, + } +} diff --git a/pkg/deploy/controller/deployment/factory.go b/pkg/deploy/controller/deployment/factory.go new file mode 100644 index 000000000000..67ed0a69c09a --- /dev/null +++ b/pkg/deploy/controller/deployment/factory.go @@ -0,0 +1,124 @@ +package deployment + +import ( + "fmt" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + controller "github.com/openshift/origin/pkg/controller" + deployapi "github.com/openshift/origin/pkg/deploy/api" + deployutil "github.com/openshift/origin/pkg/deploy/util" +) + +// DeploymentControllerFactory can create a DeploymentController that creates +// deployer pods in a configurable way. +type DeploymentControllerFactory struct { + // KubeClient is a Kubernetes client. + KubeClient kclient.Interface + // Codec is used for encoding/decoding. + Codec runtime.Codec + // Environment is a set of environment which should be injected into all deployer pod containers. + Environment []kapi.EnvVar + // RecreateStrategyImage specifies which Docker image which should implement the Recreate strategy. + RecreateStrategyImage string +} + +// Create creates a DeploymentController. +func (factory *DeploymentControllerFactory) Create() controller.RunnableController { + deploymentLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } + deploymentQueue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + cache.NewReflector(deploymentLW, &kapi.ReplicationController{}, deploymentQueue).Run() + + deployController := &DeploymentController{ + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Get(name) + }, + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Update(deployment) + }, + }, + podClient: &podClientImpl{ + createPodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + return factory.KubeClient.Pods(namespace).Create(pod) + }, + deletePodFunc: func(namespace, name string) error { + return factory.KubeClient.Pods(namespace).Delete(name) + }, + }, + makeContainer: func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) { + return factory.makeContainer(strategy) + }, + decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, factory.Codec) + }, + } + + return &controller.RetryController{ + Queue: deploymentQueue, + RetryManager: controller.NewQueueRetryManager(deploymentQueue, cache.MetaNamespaceKeyFunc, 1), + ShouldRetry: func(obj interface{}, err error) bool { + if _, isFatal := err.(fatalError); isFatal { + kutil.HandleError(err) + return false + } + return true + }, + Handle: func(obj interface{}) error { + deployment := obj.(*kapi.ReplicationController) + return deployController.Handle(deployment) + }, + } +} + +// makeContainer creates containers in the following way: +// +// 1. For the Recreate strategy, use the factory's RecreateStrategyImage as +// the container image, and the factory's Environment as the container +// environment. +// 2. For all Custom strategy, use the strategy's image for the container +// image, and use the combination of the factory's Environment and the +// strategy's environment as the container environment. +// +// An error is returned if the deployment strategy type is not supported. +func (factory *DeploymentControllerFactory) makeContainer(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) { + // Set default environment values + environment := []kapi.EnvVar{} + for _, env := range factory.Environment { + environment = append(environment, env) + } + + // Every strategy type should be handled here. + switch strategy.Type { + case deployapi.DeploymentStrategyTypeRecreate: + // Use the factory-configured image. + return &kapi.Container{ + Image: factory.RecreateStrategyImage, + Env: environment, + }, nil + case deployapi.DeploymentStrategyTypeCustom: + // Use user-defined values from the strategy input. + for _, env := range strategy.CustomParams.Environment { + environment = append(environment, env) + } + return &kapi.Container{ + Image: strategy.CustomParams.Image, + Env: environment, + }, nil + default: + return nil, fmt.Errorf("unsupported deployment strategy type: %s", strategy.Type) + } +} diff --git a/pkg/deploy/controller/deployment_controller.go b/pkg/deploy/controller/deployment_controller.go deleted file mode 100644 index 4f2bff015b5d..000000000000 --- a/pkg/deploy/controller/deployment_controller.go +++ /dev/null @@ -1,267 +0,0 @@ -package controller - -import ( - "fmt" - - "github.com/golang/glog" - - kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - - deployapi "github.com/openshift/origin/pkg/deploy/api" - deployutil "github.com/openshift/origin/pkg/deploy/util" -) - -// DeploymentController performs a deployment by creating a pod which is defined by a strategy. -// The status of the resulting deployment will follow the status of the corresponding pod. -// -// Deployments are represented by a ReplicationController. -type DeploymentController struct { - // ContainerCreator makes the container for the deployment pod based on the strategy. - ContainerCreator DeploymentContainerCreator - // DeploymentClient provides access to deployments. - DeploymentClient DeploymentControllerDeploymentClient - // PodClient provides access to pods. - PodClient DeploymentControllerPodClient - // NextDeployment blocks until the next deployment is available. - NextDeployment func() *kapi.ReplicationController - // NextPod blocks until the next pod is available. - NextPod func() *kapi.Pod - // Environment is a set of environment which should be injected into all deployment pod - // containers, in addition to whatever environment is specified by the ContainerCreator. - Environment []kapi.EnvVar - // Codec is used to decode DeploymentConfigs. - Codec runtime.Codec - // Stop is an optional channel that controls when the controller exits. - Stop <-chan struct{} -} - -// Run begins watching and synchronizing deployment states. -func (dc *DeploymentController) Run() { - go util.Until(func() { - err := dc.HandleDeployment(dc.NextDeployment()) - if err != nil { - glog.Errorf("%v", err) - } - }, 0, dc.Stop) - - go util.Until(func() { - err := dc.HandlePod(dc.NextPod()) - if err != nil { - glog.Errorf("%v", err) - } - }, 0, dc.Stop) -} - -// HandleDeployment processes a new deployment and creates a new Pod which implements the specific -// deployment behavior. The deployment and pod are correlated with annotations. If the pod was -// successfully created, the deployment's status is transitioned to pending. -func (dc *DeploymentController) HandleDeployment(deployment *kapi.ReplicationController) error { - currentStatus := statusFor(deployment) - nextStatus := currentStatus - - switch currentStatus { - case deployapi.DeploymentStatusNew: - podTemplate, err := dc.makeDeployerPod(deployment) - if err != nil { - return fmt.Errorf("couldn't make deployer pod for %s: %v", labelForDeployment(deployment), err) - } - - deploymentPod, err := dc.PodClient.CreatePod(deployment.Namespace, podTemplate) - if err != nil { - // If the pod already exists, it's possible that a previous CreatePod succeeded but - // the deployment state update failed and now we're re-entering. - if !kerrors.IsAlreadyExists(err) { - return fmt.Errorf("couldn't create deployer pod for %s: %v", labelForDeployment(deployment), err) - } - } else { - glog.V(2).Infof("Created pod %s for deployment %s", deploymentPod.Name, labelForDeployment(deployment)) - } - - deployment.Annotations[deployapi.DeploymentPodAnnotation] = deploymentPod.Name - nextStatus = deployapi.DeploymentStatusPending - case deployapi.DeploymentStatusPending, - deployapi.DeploymentStatusRunning, - deployapi.DeploymentStatusFailed: - glog.V(4).Infof("Ignoring deployment %s (status %s)", labelForDeployment(deployment), currentStatus) - case deployapi.DeploymentStatusComplete: - // Automatically clean up successful pods - // TODO: Could probably do a lookup here to skip the delete call, but it's not worth adding - // yet since (delete retries will only normally occur during full a re-sync). - podName := deployment.Annotations[deployapi.DeploymentPodAnnotation] - if err := dc.PodClient.DeletePod(deployment.Namespace, podName); err != nil { - if !kerrors.IsNotFound(err) { - return fmt.Errorf("couldn't delete completed deployer pod %s/%s for deployment %s: %v", deployment.Namespace, podName, labelForDeployment(deployment), err) - } - // Already deleted - } else { - glog.V(4).Infof("Deleted completed deployer pod %s/%s for deployment %s", deployment.Namespace, podName, labelForDeployment(deployment)) - } - } - - if currentStatus != nextStatus { - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(nextStatus) - if _, err := dc.DeploymentClient.UpdateDeployment(deployment.Namespace, deployment); err != nil { - return fmt.Errorf("Couldn't update deployment %s to status %s: %v", labelForDeployment(deployment), nextStatus, err) - } - glog.V(2).Infof("Updated deployment %s status from %s to %s", labelForDeployment(deployment), currentStatus, nextStatus) - } - - return nil -} - -// HandlePod reconciles a pod's current state with its associated deployment and updates the -// deployment appropriately. -func (dc *DeploymentController) HandlePod(pod *kapi.Pod) error { - // Verify the assumption that we'll be given only pods correlated to a deployment - deploymentName, hasDeploymentName := pod.Annotations[deployapi.DeploymentAnnotation] - if !hasDeploymentName { - glog.V(2).Infof("Ignoring pod %s; no deployment annotation found", pod.Name) - return nil - } - - deployment, deploymentErr := dc.DeploymentClient.GetDeployment(pod.Namespace, deploymentName) - if deploymentErr != nil { - return fmt.Errorf("couldn't get deployment %s/%s associated with pod %s", pod.Namespace, deploymentName, pod.Name) - } - - currentStatus := statusFor(deployment) - nextStatus := currentStatus - - switch pod.Status.Phase { - case kapi.PodRunning: - nextStatus = deployapi.DeploymentStatusRunning - case kapi.PodSucceeded, kapi.PodFailed: - nextStatus = deployapi.DeploymentStatusComplete - // Detect failure based on the container state - for _, info := range pod.Status.Info { - if info.State.Termination != nil && info.State.Termination.ExitCode != 0 { - nextStatus = deployapi.DeploymentStatusFailed - } - } - } - - if currentStatus != nextStatus { - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(nextStatus) - if _, err := dc.DeploymentClient.UpdateDeployment(deployment.Namespace, deployment); err != nil { - return fmt.Errorf("couldn't update deployment %s to status %s: %v", labelForDeployment(deployment), nextStatus, err) - } - glog.V(2).Infof("Updated deployment %s status from %s to %s", labelForDeployment(deployment), currentStatus, nextStatus) - } - - return nil -} - -// makeDeployerPod creates a pod which implements deployment behavior. The pod is correlated to -// the deployment with an annotation. -func (dc *DeploymentController) makeDeployerPod(deployment *kapi.ReplicationController) (*kapi.Pod, error) { - var deploymentConfig *deployapi.DeploymentConfig - var decodeError error - if deploymentConfig, decodeError = deployutil.DecodeDeploymentConfig(deployment, dc.Codec); decodeError != nil { - return nil, decodeError - } - - container := dc.ContainerCreator.CreateContainer(&deploymentConfig.Template.Strategy) - - // Combine the container environment, controller environment, and deployment values into - // the pod's environment. - envVars := container.Env - envVars = append(envVars, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAME", Value: deployment.Name}) - envVars = append(envVars, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAMESPACE", Value: deployment.Namespace}) - for _, env := range dc.Environment { - envVars = append(envVars, env) - } - - pod := &kapi.Pod{ - ObjectMeta: kapi.ObjectMeta{ - GenerateName: deployutil.DeployerPodNameForDeployment(deployment), - Annotations: map[string]string{ - deployapi.DeploymentAnnotation: deployment.Name, - }, - }, - Spec: kapi.PodSpec{ - Containers: []kapi.Container{ - { - Name: "deployment", - Command: container.Command, - Image: container.Image, - Env: envVars, - }, - }, - RestartPolicy: kapi.RestartPolicy{ - Never: &kapi.RestartPolicyNever{}, - }, - }, - } - - pod.Spec.Containers[0].ImagePullPolicy = kapi.PullIfNotPresent - - return pod, nil -} - -// labelFor builds a string identifier for a DeploymentConfig. -func labelForDeployment(deployment *kapi.ReplicationController) string { - return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) -} - -// statusFor gets the DeploymentStatus for deployment from its annotations. -func statusFor(deployment *kapi.ReplicationController) deployapi.DeploymentStatus { - return deployapi.DeploymentStatus(deployment.Annotations[deployapi.DeploymentStatusAnnotation]) -} - -// DeploymentContainerCreator knows how to create a deployment pod's container based on -// the deployment's strategy. -type DeploymentContainerCreator interface { - CreateContainer(*deployapi.DeploymentStrategy) *kapi.Container -} - -// DeploymentControllerDeploymentClient abstracts access to deployments. -type DeploymentControllerDeploymentClient interface { - GetDeployment(namespace, name string) (*kapi.ReplicationController, error) - UpdateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) -} - -// DeploymentControllerPodClient abstracts access to pods. -type DeploymentControllerPodClient interface { - CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) - DeletePod(namespace, name string) error -} - -// DeploymentContainerCreatorImpl is a pluggable DeploymentContainerCreator. -type DeploymentContainerCreatorImpl struct { - CreateContainerFunc func(*deployapi.DeploymentStrategy) *kapi.Container -} - -func (i *DeploymentContainerCreatorImpl) CreateContainer(strategy *deployapi.DeploymentStrategy) *kapi.Container { - return i.CreateContainerFunc(strategy) -} - -// DeploymentControllerDeploymentClientImpl is a pluggable deploymentControllerDeploymentClient. -type DeploymentControllerDeploymentClientImpl struct { - GetDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) - UpdateDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) -} - -func (i *DeploymentControllerDeploymentClientImpl) GetDeployment(namespace, name string) (*kapi.ReplicationController, error) { - return i.GetDeploymentFunc(namespace, name) -} - -func (i *DeploymentControllerDeploymentClientImpl) UpdateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - return i.UpdateDeploymentFunc(namespace, deployment) -} - -// deploymentControllerPodClientImpl is a pluggable deploymentControllerPodClient. -type DeploymentControllerPodClientImpl struct { - CreatePodFunc func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) - DeletePodFunc func(namespace, name string) error -} - -func (i *DeploymentControllerPodClientImpl) CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - return i.CreatePodFunc(namespace, pod) -} - -func (i *DeploymentControllerPodClientImpl) DeletePod(namespace, name string) error { - return i.DeletePodFunc(namespace, name) -} diff --git a/pkg/deploy/controller/deployment_controller_test.go b/pkg/deploy/controller/deployment_controller_test.go deleted file mode 100644 index 6ef33b935986..000000000000 --- a/pkg/deploy/controller/deployment_controller_test.go +++ /dev/null @@ -1,532 +0,0 @@ -package controller - -import ( - "fmt" - "testing" - - kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - - api "github.com/openshift/origin/pkg/api/latest" - deployapi "github.com/openshift/origin/pkg/deploy/api" - deploytest "github.com/openshift/origin/pkg/deploy/api/test" - deployutil "github.com/openshift/origin/pkg/deploy/util" -) - -func TestHandleNewDeploymentCreatePodOk(t *testing.T) { - var ( - updatedDeployment *kapi.ReplicationController - createdPod *kapi.Pod - expectedContainer = okContainer() - ) - - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - updatedDeployment = deployment - return updatedDeployment, nil - }, - }, - PodClient: &DeploymentControllerPodClientImpl{ - CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - createdPod = pod - return pod, nil - }, - }, - ContainerCreator: &DeploymentContainerCreatorImpl{ - CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { - return expectedContainer - }, - }, - } - - // Verify new -> pending - config := deploytest.OkDeploymentConfig(1) - deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusNew) - err := controller.HandleDeployment(deployment) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - if updatedDeployment == nil { - t.Fatalf("expected an updated deployment") - } - - if e, a := string(deployapi.DeploymentStatusPending), updatedDeployment.Annotations[deployapi.DeploymentStatusAnnotation]; e != a { - t.Fatalf("expected updated deployment status %s, got %s", e, a) - } - - if createdPod == nil { - t.Fatalf("expected a pod to be created") - } - - if _, hasPodAnnotation := updatedDeployment.Annotations[deployapi.DeploymentPodAnnotation]; !hasPodAnnotation { - t.Fatalf("missing deployment pod annotation") - } - - if e, a := createdPod.Name, updatedDeployment.Annotations[deployapi.DeploymentPodAnnotation]; e != a { - t.Fatalf("expected deployment pod annotation %s, got %s", e, a) - } - - if _, hasDeploymentAnnotation := createdPod.Annotations[deployapi.DeploymentAnnotation]; !hasDeploymentAnnotation { - t.Fatalf("missing deployment annotation") - } - - if e, a := updatedDeployment.Name, createdPod.Annotations[deployapi.DeploymentAnnotation]; e != a { - t.Fatalf("expected pod deployment annotation %s, got %s", e, a) - } - - actualContainer := createdPod.Spec.Containers[0] - - if e, a := expectedContainer.Image, actualContainer.Image; e != a { - t.Fatalf("expected container image %s, got %s", expectedContainer.Image, actualContainer.Image) - } - - if e, a := expectedContainer.Command[0], actualContainer.Command[0]; e != a { - t.Fatalf("expected container command %s, got %s", expectedContainer.Command[0], actualContainer.Command[0]) - } - - if e, a := expectedContainer.Env[0].Name, actualContainer.Env[0].Name; e != a { - t.Fatalf("expected container env name %s, got %s", expectedContainer.Env[0].Name, actualContainer.Env[0].Name) - } - - if e, a := expectedContainer.Env[0].Value, actualContainer.Env[0].Value; e != a { - t.Fatalf("expected container env value %s, got %s", expectedContainer.Env[0].Value, actualContainer.Env[0].Value) - } -} - -func TestHandleNewDeploymentCreatePodFail(t *testing.T) { - var updatedDeployment *kapi.ReplicationController - - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - UpdateDeploymentFunc: func(namspace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - updatedDeployment = deployment - return updatedDeployment, nil - }, - }, - PodClient: &DeploymentControllerPodClientImpl{ - CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - return nil, fmt.Errorf("Failed to create pod %s", pod.Name) - }, - }, - ContainerCreator: &DeploymentContainerCreatorImpl{ - CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { - return okContainer() - }, - }, - } - - config := deploytest.OkDeploymentConfig(1) - deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusNew) - err := controller.HandleDeployment(deployment) - - if err == nil { - t.Fatalf("expected an error") - } -} - -func TestHandleNewDeploymentCreatePodAlreadyExists(t *testing.T) { - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("unexpected deployment update") - return nil, nil - }, - }, - PodClient: &DeploymentControllerPodClientImpl{ - CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - return nil, kerrors.NewAlreadyExists("Pod", pod.Name) - }, - }, - ContainerCreator: &DeploymentContainerCreatorImpl{ - CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { - return okContainer() - }, - }, - } - - // Verify no-op - config := deploytest.OkDeploymentConfig(1) - deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusPending) - err := controller.HandleDeployment(deployment) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } -} - -func TestHandleDeploymentNoops(t *testing.T) { - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("unexpected deployment update") - return nil, nil - }, - }, - PodClient: &DeploymentControllerPodClientImpl{ - CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - t.Fatalf("unexpected call to create pod") - return nil, nil - }, - }, - ContainerCreator: &DeploymentContainerCreatorImpl{ - CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { - t.Fatalf("unexpected call to create container") - return nil - }, - }, - } - - // Verify no-op - config := deploytest.OkDeploymentConfig(1) - deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) - - noopStatus := []deployapi.DeploymentStatus{ - deployapi.DeploymentStatusPending, - deployapi.DeploymentStatusRunning, - deployapi.DeploymentStatusFailed, - } - for _, status := range noopStatus { - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(status) - err := controller.HandleDeployment(deployment) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - } -} - -func TestHandleDeploymentPodCleanupOk(t *testing.T) { - podName := "pod" - deletedPodName := "" - deletedPodNamespace := "" - - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("unexpected deployment update") - return nil, nil - }, - }, - PodClient: &DeploymentControllerPodClientImpl{ - CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - t.Fatalf("unexpected call to create pod") - return nil, nil - }, - DeletePodFunc: func(namespace, name string) error { - deletedPodNamespace = namespace - deletedPodName = name - return nil - }, - }, - ContainerCreator: &DeploymentContainerCreatorImpl{ - CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { - t.Fatalf("unexpected call to create container") - return nil - }, - }, - } - - // Verify successful cleanup - config := deploytest.OkDeploymentConfig(1) - deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusComplete) - deployment.Annotations[deployapi.DeploymentPodAnnotation] = podName - err := controller.HandleDeployment(deployment) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - if e, a := deployment.Namespace, deletedPodNamespace; e != a { - t.Fatalf("expected deleted pod namespace %s, got %s", e, a) - } - - if e, a := podName, deletedPodName; e != a { - t.Fatalf("expected deleted pod name %s, got %s", e, a) - } -} - -func TestHandleDeploymentPodCleanupNoop(t *testing.T) { - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("unexpected deployment update") - return nil, nil - }, - }, - PodClient: &DeploymentControllerPodClientImpl{ - CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - t.Fatalf("unexpected call to create pod") - return nil, nil - }, - DeletePodFunc: func(namespace, name string) error { - return kerrors.NewNotFound("Pod", name) - }, - }, - ContainerCreator: &DeploymentContainerCreatorImpl{ - CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { - t.Fatalf("unexpected call to create container") - return nil - }, - }, - } - - // Verify no-op - config := deploytest.OkDeploymentConfig(1) - deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusComplete) - deployment.Annotations[deployapi.DeploymentPodAnnotation] = "pod" - err := controller.HandleDeployment(deployment) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } -} - -func TestHandleDeploymentPodCleanupFailure(t *testing.T) { - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("unexpected deployment update") - return nil, nil - }, - }, - PodClient: &DeploymentControllerPodClientImpl{ - CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - t.Fatalf("unexpected call to create pod") - return nil, nil - }, - DeletePodFunc: func(namespace, name string) error { - return kerrors.NewInternalError(fmt.Errorf("test error")) - }, - }, - ContainerCreator: &DeploymentContainerCreatorImpl{ - CreateContainerFunc: func(strategy *deployapi.DeploymentStrategy) *kapi.Container { - t.Fatalf("unexpected call to create container") - return nil - }, - }, - } - - // Verify error - config := deploytest.OkDeploymentConfig(1) - deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusComplete) - deployment.Annotations[deployapi.DeploymentPodAnnotation] = "pod" - err := controller.HandleDeployment(deployment) - - if err == nil { - t.Fatalf("expected an error") - } -} - -func TestHandleUncorrelatedPod(t *testing.T) { - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("unexpected deployment update") - return nil, nil - }, - }, - } - - // Verify no-op - pod := runningPod() - pod.Annotations = make(map[string]string) - err := controller.HandlePod(pod) - - if err != nil { - t.Fatalf("unexpected err: %v", err) - } -} - -func TestHandleOrphanedPod(t *testing.T) { - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("Unexpected deployment update") - return nil, nil - }, - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - return nil, kerrors.NewNotFound("ReplicationController", name) - }, - }, - } - - err := controller.HandlePod(runningPod()) - - if err == nil { - t.Fatalf("expected an error") - } -} - -func TestHandlePodRunning(t *testing.T) { - var updatedDeployment *kapi.ReplicationController - - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - config := deploytest.OkDeploymentConfig(1) - deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusPending) - return deployment, nil - }, - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - updatedDeployment = deployment - return deployment, nil - }, - }, - } - - err := controller.HandlePod(runningPod()) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - if updatedDeployment == nil { - t.Fatalf("expected deployment update") - } - - if e, a := deployapi.DeploymentStatusRunning, statusFor(updatedDeployment); e != a { - t.Fatalf("expected updated deployment status %s, got %s", e, a) - } -} - -func TestHandlePodTerminatedOk(t *testing.T) { - var updatedDeployment *kapi.ReplicationController - - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - config := deploytest.OkDeploymentConfig(1) - deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusRunning) - return deployment, nil - }, - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - updatedDeployment = deployment - return deployment, nil - }, - }, - } - - err := controller.HandlePod(succeededPod()) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - if updatedDeployment == nil { - t.Fatalf("expected deployment update") - } - - if e, a := deployapi.DeploymentStatusComplete, statusFor(updatedDeployment); e != a { - t.Fatalf("expected updated deployment status %s, got %s", e, a) - } -} - -func TestHandlePodTerminatedNotOk(t *testing.T) { - var updatedDeployment *kapi.ReplicationController - - controller := &DeploymentController{ - Codec: api.Codec, - DeploymentClient: &DeploymentControllerDeploymentClientImpl{ - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - config := deploytest.OkDeploymentConfig(1) - deployment, _ := deployutil.MakeDeployment(config, kapi.Codec) - deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusRunning) - return deployment, nil - }, - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - updatedDeployment = deployment - return deployment, nil - }, - }, - } - - err := controller.HandlePod(failedPod()) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - if updatedDeployment == nil { - t.Fatalf("expected deployment update") - } - - if e, a := deployapi.DeploymentStatusFailed, statusFor(updatedDeployment); e != a { - t.Fatalf("expected updated deployment status %s, got %s", e, a) - } -} - -func okContainer() *kapi.Container { - return &kapi.Container{ - Image: "test/image", - Command: []string{"command"}, - Env: []kapi.EnvVar{ - { - Name: "env1", - Value: "val1", - }, - }, - } -} - -func okPod() *kapi.Pod { - return &kapi.Pod{ - ObjectMeta: kapi.ObjectMeta{ - Name: "deploy-deploy1", - Annotations: map[string]string{ - deployapi.DeploymentAnnotation: "1234", - }, - }, - Status: kapi.PodStatus{ - Info: kapi.PodInfo{ - "container1": kapi.ContainerStatus{}, - }, - }, - } -} - -func succeededPod() *kapi.Pod { - p := okPod() - p.Status.Phase = kapi.PodSucceeded - return p -} - -func failedPod() *kapi.Pod { - p := okPod() - p.Status.Phase = kapi.PodFailed - p.Status.Info["container1"] = kapi.ContainerStatus{ - State: kapi.ContainerState{ - Termination: &kapi.ContainerStateTerminated{ - ExitCode: 1, - }, - }, - } - return p -} - -func runningPod() *kapi.Pod { - p := okPod() - p.Status.Phase = kapi.PodRunning - return p -} diff --git a/pkg/deploy/controller/deploymentconfig/deployment_config_controller.go b/pkg/deploy/controller/deploymentconfig/controller.go similarity index 90% rename from pkg/deploy/controller/deploymentconfig/deployment_config_controller.go rename to pkg/deploy/controller/deploymentconfig/controller.go index 849d5688b438..a4fa090f7cc7 100644 --- a/pkg/deploy/controller/deploymentconfig/deployment_config_controller.go +++ b/pkg/deploy/controller/deploymentconfig/controller.go @@ -22,10 +22,10 @@ import ( // // Use the DeploymentConfigControllerFactory to create this controller. type DeploymentConfigController struct { - // deploymentClient provides access to Deployments. + // deploymentClient provides access to deployments. deploymentClient deploymentClient - // makeDeployment creates a Deployment from a DeploymentConfig. - makeDeployment makeDeployment + // makeDeployment knows how to make a deployment from a config. + makeDeployment func(*deployapi.DeploymentConfig) (*kapi.ReplicationController, error) } // fatalError is an error which can't be retried. @@ -33,7 +33,7 @@ type fatalError string func (e fatalError) Error() string { return "fatal error handling deploymentConfig: " + string(e) } -// Handle creates a new deployment for config as necessary. +// Handle processes config and creates a new deployment if necessary. func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) error { // Only deploy when the version has advanced past 0. if config.LatestVersion == 0 { @@ -98,6 +98,3 @@ func (i *deploymentClientImpl) getDeployment(namespace, name string) (*kapi.Repl func (i *deploymentClientImpl) createDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { return i.createDeploymentFunc(namespace, deployment) } - -// makeDeployment knows how to make a deployment from a config. -type makeDeployment func(*deployapi.DeploymentConfig) (*kapi.ReplicationController, error) diff --git a/pkg/deploy/controller/deploymentconfig/deployment_config_controller_test.go b/pkg/deploy/controller/deploymentconfig/controller_test.go similarity index 86% rename from pkg/deploy/controller/deploymentconfig/deployment_config_controller_test.go rename to pkg/deploy/controller/deploymentconfig/controller_test.go index 0dbb1b9d9ec7..91af874e3e5f 100644 --- a/pkg/deploy/controller/deploymentconfig/deployment_config_controller_test.go +++ b/pkg/deploy/controller/deploymentconfig/controller_test.go @@ -13,6 +13,8 @@ import ( deployutil "github.com/openshift/origin/pkg/deploy/util" ) +// TestHandle_initialOk ensures that an initial config (version 0) doesn't result +// in a new deployment. func TestHandle_initialOk(t *testing.T) { controller := &DeploymentConfigController{ makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { @@ -37,6 +39,8 @@ func TestHandle_initialOk(t *testing.T) { } } +// TestHandle_updateOk ensures that an updated config (version >0) with no +// existing deployment will result in a new deployment. func TestHandle_updateOk(t *testing.T) { deploymentConfig := deploytest.OkDeploymentConfig(1) var deployed *kapi.ReplicationController @@ -67,6 +71,8 @@ func TestHandle_updateOk(t *testing.T) { } } +// TestHandle_nonfatalLookupError ensures that an API failure to look up the +// existing deployment for an updated config results in a nonfatal error. func TestHandle_nonfatalLookupError(t *testing.T) { configController := &DeploymentConfigController{ makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { @@ -92,6 +98,9 @@ func TestHandle_nonfatalLookupError(t *testing.T) { } } +// TestHandle_configAlreadyDeployed ensures that an attempt to create a +// deployment for an updated config for which the deployment was already +// created results in a no-op. func TestHandle_configAlreadyDeployed(t *testing.T) { deploymentConfig := deploytest.OkDeploymentConfig(0) @@ -118,6 +127,8 @@ func TestHandle_configAlreadyDeployed(t *testing.T) { } } +// TestHandle_nonfatalCreateError ensures that a failed API attempt to create +// a new deployment for an updated config results in a nonfatal error. func TestHandle_nonfatalCreateError(t *testing.T) { configController := &DeploymentConfigController{ makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { @@ -138,10 +149,12 @@ func TestHandle_nonfatalCreateError(t *testing.T) { t.Fatalf("expected error") } if _, isFatal := err.(fatalError); isFatal { - t.Fatalf("expected a retryable error, got a fatal error: %v", err) + t.Fatalf("expected a nonfatal error, got a fatal error: %v", err) } } +// TestHandle_fatalError ensures that in internal (not API) failure to make a +// deployment from an updated config results in a fatal error. func TestHandle_fatalError(t *testing.T) { configController := &DeploymentConfigController{ makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { diff --git a/pkg/deploy/controller/deploymentconfig/factory.go b/pkg/deploy/controller/deploymentconfig/factory.go index b5cfa85d3e01..5ce3295b86d7 100644 --- a/pkg/deploy/controller/deploymentconfig/factory.go +++ b/pkg/deploy/controller/deploymentconfig/factory.go @@ -18,11 +18,15 @@ import ( // DeploymentConfigControllerFactory can create a DeploymentConfigController which obtains // DeploymentConfigs from a queue populated from a watch of all DeploymentConfigs. type DeploymentConfigControllerFactory struct { - Client *osclient.Client + // Client is an OpenShift client. + Client osclient.Interface + // KubeClient is a Kubernetes client. KubeClient kclient.Interface - Codec runtime.Codec + // Codec is used to encode/decode. + Codec runtime.Codec } +// Create creates a DeploymentConfigController. func (factory *DeploymentConfigControllerFactory) Create() controller.RunnableController { deploymentConfigLW := &deployutil.ListWatcherImpl{ ListFunc: func() (runtime.Object, error) { @@ -54,9 +58,9 @@ func (factory *DeploymentConfigControllerFactory) Create() controller.RunnableCo RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, 1), ShouldRetry: func(obj interface{}, err error) bool { if _, isFatal := err.(fatalError); isFatal { + kutil.HandleError(err) return false } - kutil.HandleError(err) return true }, Handle: func(obj interface{}) error { diff --git a/pkg/deploy/controller/factory/doc.go b/pkg/deploy/controller/factory/doc.go deleted file mode 100644 index 8c729ff8d9fe..000000000000 --- a/pkg/deploy/controller/factory/doc.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package factory contains code used to create deployment controllers. -package factory diff --git a/pkg/deploy/controller/factory/factory.go b/pkg/deploy/controller/factory/factory.go deleted file mode 100644 index 55082c33ee50..000000000000 --- a/pkg/deploy/controller/factory/factory.go +++ /dev/null @@ -1,301 +0,0 @@ -package factory - -import ( - "time" - - "github.com/golang/glog" - - kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - - osclient "github.com/openshift/origin/pkg/client" - deployapi "github.com/openshift/origin/pkg/deploy/api" - deploycontroller "github.com/openshift/origin/pkg/deploy/controller" - deployutil "github.com/openshift/origin/pkg/deploy/util" - imageapi "github.com/openshift/origin/pkg/image/api" -) - -// DeploymentControllerFactory can create a DeploymentController which obtains Deployments -// from a queue populated from a watch of Deployments. -// Pods are obtained from a queue populated from a watch of all pods. -type DeploymentControllerFactory struct { - // Client satisfies DeploymentInterface. - Client *osclient.Client - // KubeClient satisfies PodInterface. - KubeClient *kclient.Client - // Environment is a set of environment which should be injected into all deployment pod containers. - Environment []kapi.EnvVar - // RecreateStrategyImage specifies which Docker image which should implement the Recreate strategy. - RecreateStrategyImage string - // Codec is used to decode DeploymentConfigs. - Codec runtime.Codec - // Stop may be set to allow controllers created by this factory to be terminated. - Stop <-chan struct{} -} - -func (factory *DeploymentControllerFactory) Create() *deploycontroller.DeploymentController { - deploymentLW := &deployutil.ListWatcherImpl{ - ListFunc: func() (runtime.Object, error) { - return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).List(labels.Everything()) - }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) - }, - } - deploymentQueue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(deploymentLW, &kapi.ReplicationController{}, deploymentQueue).RunUntil(factory.Stop) - - deploymentStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - cache.NewReflector(deploymentLW, &kapi.ReplicationController{}, deploymentStore).RunUntil(factory.Stop) - - // Kubernetes does not currently synchronize Pod status in storage with a Pod's container - // states. Because of this, we can't receive events related to container (and thus Pod) - // state changes, such as Running -> Terminated. As a workaround, populate the FIFO with - // a polling implementation which relies on client calls to list Pods - the Get/List - // REST implementations will populate the synchronized container/pod status on-demand. - // - // TODO: Find a way to get watch events for Pod/container status updates. The polling - // strategy is horribly inefficient and should be addressed upstream somehow. - podQueue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - pollFunc := func() (cache.Enumerator, error) { - return pollPods(deploymentStore, factory.KubeClient) - } - cache.NewPoller(pollFunc, 10*time.Second, podQueue).RunUntil(factory.Stop) - - return &deploycontroller.DeploymentController{ - ContainerCreator: &defaultContainerCreator{factory.RecreateStrategyImage}, - DeploymentClient: &deploycontroller.DeploymentControllerDeploymentClientImpl{ - // Since we need to use a deployment cache to support the pod poller, go ahead and use - // it for other deployment lookups and maintain the usual REST API for not-found errors. - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - example := &kapi.ReplicationController{ - ObjectMeta: kapi.ObjectMeta{ - Namespace: namespace, - Name: name, - }} - obj, exists, err := deploymentStore.Get(example) - if !exists { - return nil, kerrors.NewNotFound(example.Kind, name) - } - if err != nil { - return nil, err - } - return obj.(*kapi.ReplicationController), nil - }, - UpdateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - return factory.KubeClient.ReplicationControllers(namespace).Update(deployment) - }, - }, - PodClient: &deploycontroller.DeploymentControllerPodClientImpl{ - CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { - return factory.KubeClient.Pods(namespace).Create(pod) - }, - DeletePodFunc: func(namespace, name string) error { - return factory.KubeClient.Pods(namespace).Delete(name) - }, - }, - Environment: factory.Environment, - NextDeployment: func() *kapi.ReplicationController { - deployment := deploymentQueue.Pop().(*kapi.ReplicationController) - panicIfStopped(factory.Stop, "deployment controller stopped") - return deployment - }, - NextPod: func() *kapi.Pod { - pod := podQueue.Pop().(*kapi.Pod) - panicIfStopped(factory.Stop, "deployment controller stopped") - return pod - }, - Codec: factory.Codec, - Stop: factory.Stop, - } -} - -// CreateContainer is the default DeploymentContainerCreator. It makes containers using only -// the input strategy parameters and a user defined image for the Recreate strategy. -type defaultContainerCreator struct { - recreateStrategyImage string -} - -func (c *defaultContainerCreator) CreateContainer(strategy *deployapi.DeploymentStrategy) *kapi.Container { - // Every strategy type should be handled here. - switch strategy.Type { - case deployapi.DeploymentStrategyTypeRecreate: - // Use the factory-configured image. - return &kapi.Container{ - Image: c.recreateStrategyImage, - } - case deployapi.DeploymentStrategyTypeCustom: - // Use user-defined values from the strategy input. - return &kapi.Container{ - Image: strategy.CustomParams.Image, - Env: strategy.CustomParams.Environment, - } - default: - // TODO: This shouldn't be reachable. Improve error handling. - glog.Errorf("Unsupported deployment strategy type %s", strategy.Type) - return nil - } -} - -// pollPods lists all pods associated with pending or running deployments and returns -// a cache.Enumerator suitable for use with a cache.Poller. -func pollPods(deploymentStore cache.Store, kClient kclient.PodsNamespacer) (cache.Enumerator, error) { - list := &kapi.PodList{} - - for _, obj := range deploymentStore.List() { - deployment := obj.(*kapi.ReplicationController) - - switch deployapi.DeploymentStatus(deployment.Annotations[deployapi.DeploymentStatusAnnotation]) { - case deployapi.DeploymentStatusPending, deployapi.DeploymentStatusRunning: - // Validate the correlating pod annotation - podID, hasPodID := deployment.Annotations[deployapi.DeploymentPodAnnotation] - if !hasPodID { - glog.V(2).Infof("Unexpected state: Deployment %s has no pod annotation; skipping pod polling", deployment.Name) - continue - } - - pod, err := kClient.Pods(deployment.Namespace).Get(podID) - if err != nil { - glog.V(2).Infof("Couldn't find pod %s for deployment %s: %#v", podID, deployment.Name, err) - continue - } - - list.Items = append(list.Items, *pod) - } - } - - return &podEnumerator{list}, nil -} - -// podEnumerator allows a cache.Poller to enumerate items in an api.PodList -type podEnumerator struct { - *kapi.PodList -} - -// Len returns the number of items in the pod list. -func (pe *podEnumerator) Len() int { - if pe.PodList == nil { - return 0 - } - return len(pe.Items) -} - -// Get returns the item (and ID) with the particular index. -func (pe *podEnumerator) Get(index int) interface{} { - return &pe.Items[index] -} - -// DeploymentConfigChangeControllerFactory can create a DeploymentConfigChangeController which obtains DeploymentConfigs -// from a queue populated from a watch of all DeploymentConfigs. -type DeploymentConfigChangeControllerFactory struct { - Client osclient.Interface - KubeClient kclient.Interface - Codec runtime.Codec - // Stop may be set to allow controllers created by this factory to be terminated. - Stop <-chan struct{} -} - -func (factory *DeploymentConfigChangeControllerFactory) Create() *deploycontroller.DeploymentConfigChangeController { - deploymentConfigLW := &deployutil.ListWatcherImpl{ - ListFunc: func() (runtime.Object, error) { - return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) - }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) - }, - } - queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue).RunUntil(factory.Stop) - - return &deploycontroller.DeploymentConfigChangeController{ - ChangeStrategy: &deploycontroller.ChangeStrategyImpl{ - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - return factory.KubeClient.ReplicationControllers(namespace).Get(name) - }, - GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { - return factory.Client.DeploymentConfigs(namespace).Generate(name) - }, - UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { - return factory.Client.DeploymentConfigs(namespace).Update(config) - }, - }, - NextDeploymentConfig: func() *deployapi.DeploymentConfig { - config := queue.Pop().(*deployapi.DeploymentConfig) - panicIfStopped(factory.Stop, "deployment config change controller stopped") - return config - }, - Codec: factory.Codec, - Stop: factory.Stop, - } -} - -// ImageChangeControllerFactory can create an ImageChangeController which obtains ImageRepositories -// from a queue populated from a watch of all ImageRepositories. -type ImageChangeControllerFactory struct { - Client *osclient.Client - // Stop may be set to allow controllers created by this factory to be terminated. - Stop <-chan struct{} -} - -func (factory *ImageChangeControllerFactory) Create() *deploycontroller.ImageChangeController { - imageRepositoryLW := &deployutil.ListWatcherImpl{ - ListFunc: func() (runtime.Object, error) { - return factory.Client.ImageRepositories(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) - }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return factory.Client.ImageRepositories(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) - }, - } - queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(imageRepositoryLW, &imageapi.ImageRepository{}, queue).RunUntil(factory.Stop) - - deploymentConfigLW := &deployutil.ListWatcherImpl{ - ListFunc: func() (runtime.Object, error) { - return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) - }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) - }, - } - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, store).RunUntil(factory.Stop) - - return &deploycontroller.ImageChangeController{ - DeploymentConfigClient: &deploycontroller.ImageChangeControllerDeploymentConfigClientImpl{ - ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { - configs := []*deployapi.DeploymentConfig{} - objs := store.List() - for _, obj := range objs { - configs = append(configs, obj.(*deployapi.DeploymentConfig)) - } - return configs, nil - }, - GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { - return factory.Client.DeploymentConfigs(namespace).Generate(name) - }, - UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { - return factory.Client.DeploymentConfigs(namespace).Update(config) - }, - }, - NextImageRepository: func() *imageapi.ImageRepository { - repo := queue.Pop().(*imageapi.ImageRepository) - panicIfStopped(factory.Stop, "deployment config change controller stopped") - return repo - }, - Stop: factory.Stop, - } -} - -// panicIfStopped panics with the provided object if the channel is closed -func panicIfStopped(ch <-chan struct{}, message interface{}) { - select { - case <-ch: - panic(message) - default: - } -} diff --git a/pkg/deploy/controller/image_change_controller.go b/pkg/deploy/controller/imagechange/controller.go similarity index 71% rename from pkg/deploy/controller/image_change_controller.go rename to pkg/deploy/controller/imagechange/controller.go index d6c8b719504f..8c8e8f0f3010 100644 --- a/pkg/deploy/controller/image_change_controller.go +++ b/pkg/deploy/controller/imagechange/controller.go @@ -1,4 +1,4 @@ -package controller +package imagechange import ( "fmt" @@ -11,32 +11,25 @@ import ( imageapi "github.com/openshift/origin/pkg/image/api" ) -// ImageChangeController watches for changes to ImageRepositories and regenerates -// DeploymentConfigs when a new version of a tag referenced by a DeploymentConfig -// is available. +// ImageChangeController increments the version of a DeploymentConfig which has an image +// change trigger when a tag update to a triggered ImageRepository is detected. +// +// Use the ImageChangeControllerFactory to create this controller. type ImageChangeController struct { - DeploymentConfigClient ImageChangeControllerDeploymentConfigClient - NextImageRepository func() *imageapi.ImageRepository - // Stop is an optional channel that controls when the controller exits - Stop <-chan struct{} + deploymentConfigClient deploymentConfigClient } -// Run processes ImageRepository events one by one. -func (c *ImageChangeController) Run() { - go util.Until(func() { - err := c.HandleImageRepo(c.NextImageRepository()) - if err != nil { - glog.Errorf("%v", err) - } - }, 0, c.Stop) -} +// fatalError is an error which can't be retried. +type fatalError string -// HandleImageRepo processes the next ImageRepository event. -func (c *ImageChangeController) HandleImageRepo(imageRepo *imageapi.ImageRepository) error { +func (e fatalError) Error() string { return "fatal error handling imageRepository: " + string(e) } + +// Handle processes image change triggers associated with imageRepo. +func (c *ImageChangeController) Handle(imageRepo *imageapi.ImageRepository) error { configsToGenerate := []*deployapi.DeploymentConfig{} firedTriggersForConfig := make(map[string][]deployapi.DeploymentTriggerImageChangeParams) - configs, err := c.DeploymentConfigClient.ListDeploymentConfigs() + configs, err := c.deploymentConfigClient.listDeploymentConfigs() if err != nil { return fmt.Errorf("couldn't get list of deploymentConfigs while handling imageRepo %s: %v", labelForRepo(imageRepo), err) } @@ -91,7 +84,7 @@ func (c *ImageChangeController) HandleImageRepo(imageRepo *imageapi.ImageReposit } if anyFailed { - return fmt.Errorf("couldn't update some deploymentConfigs for trigger on imageRepo %s", labelForRepo(imageRepo)) + return fatalError(fmt.Sprintf("couldn't update some deploymentConfigs for trigger on imageRepo %s", labelForRepo(imageRepo))) } glog.V(4).Infof("Updated all configs for trigger on imageRepo %s", labelForRepo(imageRepo)) @@ -125,7 +118,7 @@ func triggerMatchesImage(config *deployapi.DeploymentConfig, trigger *deployapi. func (c *ImageChangeController) regenerate(imageRepo *imageapi.ImageRepository, config *deployapi.DeploymentConfig, triggers []deployapi.DeploymentTriggerImageChangeParams) error { // Get a regenerated config which includes the new image repo references - newConfig, err := c.DeploymentConfigClient.GenerateDeploymentConfig(config.Namespace, config.Name) + newConfig, err := c.deploymentConfigClient.generateDeploymentConfig(config.Namespace, config.Name) if err != nil { return fmt.Errorf("error generating new version of deploymentConfig %s: %v", labelFor(config), err) } @@ -163,9 +156,9 @@ func (c *ImageChangeController) regenerate(imageRepo *imageapi.ImageRepository, } // Persist the new config - _, err = c.DeploymentConfigClient.UpdateDeploymentConfig(newConfig.Namespace, newConfig) + _, err = c.deploymentConfigClient.updateDeploymentConfig(newConfig.Namespace, newConfig) if err != nil { - return fmt.Errorf("couldn't update deploymentConfig %s: %v", labelFor(config), err) + return err } return nil @@ -175,28 +168,33 @@ func labelForRepo(imageRepo *imageapi.ImageRepository) string { return fmt.Sprintf("%s/%s", imageRepo.Namespace, imageRepo.Name) } +// labelFor builds a string identifier for a DeploymentConfig. +func labelFor(config *deployapi.DeploymentConfig) string { + return fmt.Sprintf("%s/%s:%d", config.Namespace, config.Name, config.LatestVersion) +} + // ImageChangeControllerDeploymentConfigClient abstracts access to DeploymentConfigs. -type ImageChangeControllerDeploymentConfigClient interface { - ListDeploymentConfigs() ([]*deployapi.DeploymentConfig, error) - UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) - GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) +type deploymentConfigClient interface { + listDeploymentConfigs() ([]*deployapi.DeploymentConfig, error) + updateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) + generateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) } // ImageChangeControllerDeploymentConfigClientImpl is a pluggable ChangeStrategy. -type ImageChangeControllerDeploymentConfigClientImpl struct { - ListDeploymentConfigsFunc func() ([]*deployapi.DeploymentConfig, error) - GenerateDeploymentConfigFunc func(namespace, name string) (*deployapi.DeploymentConfig, error) - UpdateDeploymentConfigFunc func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) +type deploymentConfigClientImpl struct { + listDeploymentConfigsFunc func() ([]*deployapi.DeploymentConfig, error) + generateDeploymentConfigFunc func(namespace, name string) (*deployapi.DeploymentConfig, error) + updateDeploymentConfigFunc func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) } -func (i *ImageChangeControllerDeploymentConfigClientImpl) ListDeploymentConfigs() ([]*deployapi.DeploymentConfig, error) { - return i.ListDeploymentConfigsFunc() +func (i *deploymentConfigClientImpl) listDeploymentConfigs() ([]*deployapi.DeploymentConfig, error) { + return i.listDeploymentConfigsFunc() } -func (i *ImageChangeControllerDeploymentConfigClientImpl) GenerateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) { - return i.GenerateDeploymentConfigFunc(namespace, name) +func (i *deploymentConfigClientImpl) generateDeploymentConfig(namespace, name string) (*deployapi.DeploymentConfig, error) { + return i.generateDeploymentConfigFunc(namespace, name) } -func (i *ImageChangeControllerDeploymentConfigClientImpl) UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { - return i.UpdateDeploymentConfigFunc(namespace, config) +func (i *deploymentConfigClientImpl) updateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + return i.updateDeploymentConfigFunc(namespace, config) } diff --git a/pkg/deploy/controller/image_change_controller_test.go b/pkg/deploy/controller/imagechange/controller_test.go similarity index 82% rename from pkg/deploy/controller/image_change_controller_test.go rename to pkg/deploy/controller/imagechange/controller_test.go index c753d91338a2..4731ac0d71df 100644 --- a/pkg/deploy/controller/image_change_controller_test.go +++ b/pkg/deploy/controller/imagechange/controller_test.go @@ -1,4 +1,4 @@ -package controller +package imagechange import ( "testing" @@ -14,18 +14,21 @@ const ( nonDefaultNamespace = "nondefaultnamespace" ) -func TestUnregisteredContainer(t *testing.T) { +// TestHandle_unregisteredContainer ensures that an image update for which +// there is a trigger defined results in a no-op due to the config's +// containers not matching the trigger's containers. +func TestHandle_unregisteredContainer(t *testing.T) { controller := &ImageChangeController{ - DeploymentConfigClient: &ImageChangeControllerDeploymentConfigClientImpl{ - UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + deploymentConfigClient: &deploymentConfigClientImpl{ + updateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { t.Fatalf("unexpected deployment config update") return nil, nil }, - GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + generateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { t.Fatalf("unexpected generator call") return nil, nil }, - ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { + listDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { config := deployapitest.OkDeploymentConfig(1) config.Triggers[0].ImageChangeParams.ContainerNames = []string{"container-3"} @@ -35,25 +38,28 @@ func TestUnregisteredContainer(t *testing.T) { } // verify no-op - err := controller.HandleImageRepo(tagUpdate()) + err := controller.Handle(tagUpdate()) if err != nil { t.Fatalf("unexpected err: %v", err) } } -func TestImageChangeForNonAutomaticTag(t *testing.T) { +// TestHandle_changeForNonAutomaticTag ensures that an image update for which +// there is a matching trigger results in a no-op due to the trigger's +// automatic flag being set to false. +func TestHandle_changeForNonAutomaticTag(t *testing.T) { controller := &ImageChangeController{ - DeploymentConfigClient: &ImageChangeControllerDeploymentConfigClientImpl{ - UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + deploymentConfigClient: &deploymentConfigClientImpl{ + updateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { t.Fatalf("unexpected deployment config update") return nil, nil }, - GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + generateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { t.Fatalf("unexpected generator call") return nil, nil }, - ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { + listDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { config := deployapitest.OkDeploymentConfig(1) config.Triggers[0].ImageChangeParams.Automatic = false @@ -63,25 +69,28 @@ func TestImageChangeForNonAutomaticTag(t *testing.T) { } // verify no-op - err := controller.HandleImageRepo(tagUpdate()) + err := controller.Handle(tagUpdate()) if err != nil { t.Fatalf("unexpected err: %v", err) } } -func TestImageChangeForUnregisteredTag(t *testing.T) { +// TestHandle_changeForUnregisteredTag ensures that an image update for which +// there is a matching trigger results in a no-op due to the tag specified on +// the trigger not matching the tags defined on the image repo. +func TestHandle_changeForUnregisteredTag(t *testing.T) { controller := &ImageChangeController{ - DeploymentConfigClient: &ImageChangeControllerDeploymentConfigClientImpl{ - UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + deploymentConfigClient: &deploymentConfigClientImpl{ + updateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { t.Fatalf("unexpected deployment config update") return nil, nil }, - GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + generateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { t.Fatalf("unexpected generator call") return nil, nil }, - ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { + listDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { return []*deployapi.DeploymentConfig{imageChangeDeploymentConfig()}, nil }, }, @@ -92,13 +101,16 @@ func TestImageChangeForUnregisteredTag(t *testing.T) { imageRepo.Tags = map[string]string{ "unknown-tag": "ref-1", } - err := controller.HandleImageRepo(imageRepo) + err := controller.Handle(imageRepo) if err != nil { t.Fatalf("unexpected err: %v", err) } } -func TestImageChangeMatchScenarios(t *testing.T) { +// TestHande_matchScenarios comprehensively tests trigger definitions against +// image repo updates to ensure that the image change triggers match (or don't +// match) properly. +func TestHande_matchScenarios(t *testing.T) { params := map[string]*deployapi.DeploymentTriggerImageChangeParams{ "params.1": { Automatic: true, @@ -180,29 +192,29 @@ func TestImageChangeMatchScenarios(t *testing.T) { generated := false controller := &ImageChangeController{ - DeploymentConfigClient: &ImageChangeControllerDeploymentConfigClientImpl{ - UpdateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + deploymentConfigClient: &deploymentConfigClientImpl{ + updateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { if !s.matches { t.Fatalf("unexpected deployment config update for scenario: %v", s) } updated = true return config, nil }, - GenerateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + generateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { if !s.matches { t.Fatalf("unexpected generator call for scenario: %v", s) } generated = true return config, nil }, - ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { + listDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { return []*deployapi.DeploymentConfig{config}, nil }, }, } t.Logf("running scenario: %v", s) - err := controller.HandleImageRepo(updates[s.repo]) + err := controller.Handle(updates[s.repo]) if err != nil { t.Fatalf("unexpected error for scenario %v: %v", s, err) diff --git a/pkg/deploy/controller/imagechange/factory.go b/pkg/deploy/controller/imagechange/factory.go new file mode 100644 index 000000000000..96b7fe007135 --- /dev/null +++ b/pkg/deploy/controller/imagechange/factory.go @@ -0,0 +1,83 @@ +package imagechange + +import ( + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + osclient "github.com/openshift/origin/pkg/client" + controller "github.com/openshift/origin/pkg/controller" + deployapi "github.com/openshift/origin/pkg/deploy/api" + deployutil "github.com/openshift/origin/pkg/deploy/util" + imageapi "github.com/openshift/origin/pkg/image/api" +) + +// ImageChangeControllerFactory can create an ImageChangeController which +// watches all ImageRepository changes. +type ImageChangeControllerFactory struct { + // Client is an OpenShift client. + Client osclient.Interface +} + +// Create creates an ImageChangeController. +func (factory *ImageChangeControllerFactory) Create() controller.RunnableController { + imageRepositoryLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.Client.ImageRepositories(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.Client.ImageRepositories(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } + queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + cache.NewReflector(imageRepositoryLW, &imageapi.ImageRepository{}, queue).Run() + + deploymentConfigLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } + store := cache.NewStore(cache.MetaNamespaceKeyFunc) + cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, store).Run() + + changeController := &ImageChangeController{ + deploymentConfigClient: &deploymentConfigClientImpl{ + listDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { + configs := []*deployapi.DeploymentConfig{} + objs := store.List() + for _, obj := range objs { + configs = append(configs, obj.(*deployapi.DeploymentConfig)) + } + return configs, nil + }, + generateDeploymentConfigFunc: func(namespace, name string) (*deployapi.DeploymentConfig, error) { + return factory.Client.DeploymentConfigs(namespace).Generate(name) + }, + updateDeploymentConfigFunc: func(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { + return factory.Client.DeploymentConfigs(namespace).Update(config) + }, + }, + } + + return &controller.RetryController{ + Queue: queue, + RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, 1), + ShouldRetry: func(obj interface{}, err error) bool { + if _, isFatal := err.(fatalError); isFatal { + kutil.HandleError(err) + return false + } + return true + }, + Handle: func(obj interface{}) error { + repo := obj.(*imageapi.ImageRepository) + return changeController.Handle(repo) + }, + } +} diff --git a/test/integration/deploy_trigger_test.go b/test/integration/deploy_trigger_test.go index 8e147c53eb23..2b442b40f230 100644 --- a/test/integration/deploy_trigger_test.go +++ b/test/integration/deploy_trigger_test.go @@ -28,8 +28,9 @@ import ( buildetcd "github.com/openshift/origin/pkg/build/registry/etcd" osclient "github.com/openshift/origin/pkg/client" deployapi "github.com/openshift/origin/pkg/deploy/api" + configchangecontroller "github.com/openshift/origin/pkg/deploy/controller/configchange" deployconfigcontroller "github.com/openshift/origin/pkg/deploy/controller/deploymentconfig" - deploycontrollerfactory "github.com/openshift/origin/pkg/deploy/controller/factory" + imagechangecontroller "github.com/openshift/origin/pkg/deploy/controller/imagechange" deployconfiggenerator "github.com/openshift/origin/pkg/deploy/generator" deployregistry "github.com/openshift/origin/pkg/deploy/registry/deploy" deployconfigregistry "github.com/openshift/origin/pkg/deploy/registry/deployconfig" @@ -384,19 +385,16 @@ func NewTestOpenshift(t *testing.T) *testOpenshift { } dccFactory.Create().Run() - cccFactory := deploycontrollerfactory.DeploymentConfigChangeControllerFactory{ + cccFactory := configchangecontroller.DeploymentConfigChangeControllerFactory{ Client: osClient, KubeClient: kubeClient, Codec: latest.Codec, - Stop: openshift.stop, } cccFactory.Create().Run() - iccFactory := deploycontrollerfactory.ImageChangeControllerFactory{ + iccFactory := imagechangecontroller.ImageChangeControllerFactory{ Client: osClient, - Stop: openshift.stop, } - iccFactory.Create().Run() biccFactory := buildcontrollerfactory.ImageChangeControllerFactory{