diff --git a/pkg/cmd/server/origin/master.go b/pkg/cmd/server/origin/master.go index f44926aef045..dd46c92c3f34 100644 --- a/pkg/cmd/server/origin/master.go +++ b/pkg/cmd/server/origin/master.go @@ -52,6 +52,7 @@ import ( deploycontroller "github.com/openshift/origin/pkg/deploy/controller/deployment" deployconfigcontroller "github.com/openshift/origin/pkg/deploy/controller/deploymentconfig" imagechangecontroller "github.com/openshift/origin/pkg/deploy/controller/imagechange" + execnewpodcontroller "github.com/openshift/origin/pkg/deploy/controller/lifecycle/execnewpod" deployconfiggenerator "github.com/openshift/origin/pkg/deploy/generator" deployregistry "github.com/openshift/origin/pkg/deploy/registry/deploy" deployconfigregistry "github.com/openshift/origin/pkg/deploy/registry/deployconfig" @@ -698,6 +699,7 @@ func (c *MasterConfig) RunDeployerPodController() { _, kclient := c.DeploymentControllerClients() factory := deployerpodcontroller.DeployerPodControllerFactory{ KubeClient: kclient, + Codec: latest.Codec, } controller := factory.Create() @@ -733,6 +735,16 @@ func (c *MasterConfig) RunDeploymentImageChangeTriggerController() { controller.Run() } +func (c *MasterConfig) RunExecNewPodController() { + _, kclient := c.DeploymentControllerClients() + factory := &execnewpodcontroller.ExecNewPodControllerFactory{ + KubeClient: kclient, + } + + controller := factory.Create() + controller.Run() +} + // RouteAllocator returns a route allocation controller. func (c *MasterConfig) RouteAllocator() *routeallocationcontroller.RouteAllocationController { factory := routeallocationcontroller.RouteAllocationControllerFactory{ diff --git a/pkg/deploy/api/types.go b/pkg/deploy/api/types.go index abbe9e5bf180..1e1abd87fa59 100644 --- a/pkg/deploy/api/types.go +++ b/pkg/deploy/api/types.go @@ -52,6 +52,8 @@ type DeploymentStrategy struct { Type DeploymentStrategyType `json:"type,omitempty"` // CustomParams are the input to the Custom deployment strategy. CustomParams *CustomDeploymentStrategyParams `json:"customParams,omitempty"` + // Lifecycle provides optional hooks into the deployment process. + Lifecycle *Lifecycle `json:"lifecycle,omitempty"` } // DeploymentStrategyType refers to a specific DeploymentStrategy implementation. @@ -74,6 +76,85 @@ type CustomDeploymentStrategyParams struct { Command []string `json:"command,omitempty"` } +// Lifecycle describes actions the system should take in response to +// deployment lifecycle events. The deployment process blocks while executing +// lifecycle handlers. A HandleFailurePolicy determines what action is taken +// in response to a failed handler. +type Lifecycle struct { + // Pre is called immediately before the deployment strategy executes. + Pre *Handler `json:"pre,omitempty"` + // Post is called immediately after the deployment strategy executes. + // NOTE: AbortHandlerFailurePolicy is not supported for Post. + Post *Handler `json:"post,omitempty"` +} + +// Handler defines a specific deployment lifecycle action. +type Handler struct { + // Status is the current status of the handler. + Status DeploymentLifecycleStatus `json:"status"` + // FailurePolicy specifies what action to take if the handler fails. + FailurePolicy HandlerFailurePolicy `json:"failurePolicy"` + // ExecNewPod specifies the action to take. + ExecNewPod *ExecNewPodAction `json:"execNewPod,omitempty"` +} + +// DeploymentLifecycleStatus represents the status of a lifecycle action. +type DeploymentLifecycleStatus string + +const ( + // DeploymentLifecycleStatusPending means the action has not yet executed. + DeploymentLifecycleStatusPending DeploymentLifecycleStatus = "Pending" + // DeploymentLifecycleStatusRunning means the action is currently executing. + DeploymentLifecycleStatusRunning DeploymentLifecycleStatus = "Running" + // DeploymentLifecycleStatusComplete means the action completed + // successfully, taking into account failure policy. + DeploymentLifecycleStatusComplete DeploymentLifecycleStatus = "Complete" + // DeploymentLifecycleStatusFailed means the action failed to execute, + // taking into account failure policy. + DeploymentLifecycleStatusFailed DeploymentLifecycleStatus = "Failed" + // DeploymentLifecycleStatusCompleteWithErrors means the action failed to + // execute, but the failure was ignored due to failure policy. + DeploymentLifecycleStatusCompleteWithErrors DeploymentLifecycleStatus = "CompleteWithErrors" +) + +// HandlerFailurePolicy describes the action to take if a handler fails. +type HandlerFailurePolicy string + +const ( + // HandlerFailurePolicyRetry means retry the handler until it succeeds. + HandlerFailurePolicyRetry HandlerFailurePolicy = "Retry" + // HandlerFailurePolicyAbort means abort the deployment (if possible). + HandlerFailurePolicyAbort HandlerFailurePolicy = "Abort" + // HandlerFailurePolicyIgnore means ignore failure and continue the deployment. + HandlerFailurePolicyIgnore HandlerFailurePolicy = "Ignore" +) + +// ExecNewPodAction runs a command in a new pod based on the specified +// container which is assumed to be part of the deployment template. +type ExecNewPodAction struct { + // Command is the action command and its arguments. + Command []string `json:"command"` + // Env is a set of environment variables to supply to the action's container. + Env []kapi.EnvVar `json:"env,omitempty"` + // ContainerName is the name of a container in the deployment pod + // template whose Docker image will be used for the action's container. + ContainerName string `json:"containerName"` +} + +// These annotations are used to track pods for ExecNewPodAction. +const ( + // PreExecNewPodActionPodAnnotation is the name of a pre-deployment pod. + PreExecNewPodActionPodAnnotation = "openshift.io/deployment.lifecycle.pre.execnewpod.pod" + // PreExecNewPodActionPodPhaseAnnotation is the phase of a pre-deployment + // pod and is used to track its status and outcome. + PreExecNewPodActionPodPhaseAnnotation = "openshift.io/deployment.lifecycle.pre.execnewpod.phase" + // PostExecNewPodActionPodAnnotation is the name of a post-deployment pod. + PostExecNewPodActionPodAnnotation = "openshift.io/deployment.lifecycle.post.execnewpod.pod" + // PostDeploymentHookPodPhaseAnnotation is the phase of a post-deployment + // pod and is used to track its status and outcome. + PostExecNewPodActionPodPhaseAnnotation = "openshift.io/deployment.lifecycle.post.execnewpod.phase" +) + // A DeploymentList is a collection of deployments. // DEPRECATED: Like Deployment, this is no longer used. type DeploymentList struct { diff --git a/pkg/deploy/api/v1beta1/types.go b/pkg/deploy/api/v1beta1/types.go index 0c673a2f8f65..758d813cd12c 100644 --- a/pkg/deploy/api/v1beta1/types.go +++ b/pkg/deploy/api/v1beta1/types.go @@ -53,6 +53,8 @@ type DeploymentStrategy struct { Type DeploymentStrategyType `json:"type,omitempty"` // CustomParams are the input to the Custom deployment strategy. CustomParams *CustomDeploymentStrategyParams `json:"customParams,omitempty"` + // Lifecycle provides optional hooks into the deployment process. + Lifecycle *Lifecycle `json:"lifecycle,omitempty"` } // DeploymentStrategyType refers to a specific DeploymentStrategy implementation. @@ -75,6 +77,82 @@ type CustomDeploymentStrategyParams struct { Command []string `json:"command,omitempty"` } +// Lifecycle describes actions the system should take in response to +// deployment lifecycle events. The deployment process blocks while executing +// lifecycle handlers. A HandleFailurePolicy determines what action is taken +// in response to a failed handler. +type Lifecycle struct { + // Pre is called immediately before the deployment strategy executes. + Pre *Handler `json:"pre,omitempty"` + // Post is called immediately after the deployment strategy executes. + // NOTE: AbortHandlerFailurePolicy is not supported for Post. + Post *Handler `json:"post,omitempty"` +} + +// Handler defines a specific deployment lifecycle action. +type Handler struct { + // Status is the current status of the handler. + Status DeploymentLifecycleStatus `json:"status"` + // FailurePolicy specifies what action to take if the handler fails. + FailurePolicy HandlerFailurePolicy `json:"failurePolicy"` + // ExecNewPod specifies the action to take. + ExecNewPod *ExecNewPodAction `json:"execNewPod,omitempty"` +} + +// DeploymentLifecycleStatus represents the status of a lifecycle action. +type DeploymentLifecycleStatus string + +const ( + // DeploymentLifecycleStatusPending means the action has not yet executed. + DeploymentLifecycleStatusPending DeploymentLifecycleStatus = "Pending" + // DeploymentLifecycleStatusRunning means the action is currently executing. + DeploymentLifecycleStatusRunning DeploymentLifecycleStatus = "Running" + // DeploymentLifecycleStatusComplete means the action completed + // successfully, taking into account failure policy. + DeploymentLifecycleStatusComplete DeploymentLifecycleStatus = "Complete" + // DeploymentLifecycleStatusFailed means the action failed to execute, + // taking into account failure policy. + DeploymentLifecycleStatusFailed DeploymentLifecycleStatus = "Failed" +) + +// HandlerFailurePolicy describes the action to take if a handler fails. +type HandlerFailurePolicy string + +const ( + // HandlerFailurePolicyRetry means retry the handler until it succeeds. + HandlerFailurePolicyRetry HandlerFailurePolicy = "Retry" + // HandlerFailurePolicyAbort means abort the deployment (if possible). + HandlerFailurePolicyAbort HandlerFailurePolicy = "Abort" + // HandlerFailurePolicyIgnore means ignore failure and continue the deployment. + HandlerFailurePolicyIgnore HandlerFailurePolicy = "Ignore" +) + +// ExecNewPodAction runs a command in a new pod based on the specified +// container which is assumed to be part of the deployment template. +type ExecNewPodAction struct { + // Command is the action command and its arguments. + Command []string `json:"command"` + // Env is a set of environment variables to supply to the action's container. + Env []kapi.EnvVar `json:"env,omitempty"` + // ContainerName is the name of a container in the deployment pod + // template whose Docker image will be used for the action's container. + ContainerName string `json:"containerName"` +} + +// These annotations are used to track pods for ExecNewPodAction. +const ( + // PreExecNewPodActionPodAnnotation is the name of a pre-deployment pod. + PreExecNewPodActionPodAnnotation = "openshift.io/deployment.lifecycle.pre.execnewpod.pod" + // PreExecNewPodActionPodPhaseAnnotation is the phase of a pre-deployment + // pod and is used to track its status and outcome. + PreExecNewPodActionPodPhaseAnnotation = "openshift.io/deployment.lifecycle.pre.execnewpod.phase" + // PostExecNewPodActionPodAnnotation is the name of a post-deployment pod. + PostExecNewPodActionPodAnnotation = "openshift.io/deployment.lifecycle.post.execnewpod.pod" + // PostDeploymentHookPodPhaseAnnotation is the phase of a post-deployment + // pod and is used to track its status and outcome. + PostExecNewPodActionPodPhaseAnnotation = "openshift.io/deployment.lifecycle.post.execnewpod.phase" +) + // A DeploymentList is a collection of deployments. // DEPRECATED: Like Deployment, this is no longer used. type DeploymentList struct { diff --git a/pkg/deploy/controller/deployerpod/controller.go b/pkg/deploy/controller/deployerpod/controller.go index 219dcdde6255..c8ed0f52547d 100644 --- a/pkg/deploy/controller/deployerpod/controller.go +++ b/pkg/deploy/controller/deployerpod/controller.go @@ -8,6 +8,7 @@ import ( kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" deployapi "github.com/openshift/origin/pkg/deploy/api" + lifecycle "github.com/openshift/origin/pkg/deploy/controller/lifecycle" ) // DeployerPodController keeps a deployment's status in sync with the deployer pod @@ -17,6 +18,7 @@ import ( type DeployerPodController struct { // deploymentClient provides access to deployments. deploymentClient deploymentClient + lifecycleManager lifecycle.Interface } // Handle syncs pod's status with any associated deployment. @@ -36,10 +38,35 @@ func (c *DeployerPodController) Handle(pod *kapi.Pod) error { currentStatus := statusFor(deployment) nextStatus := currentStatus +statusSwitch: switch pod.Status.Phase { case kapi.PodRunning: nextStatus = deployapi.DeploymentStatusRunning case kapi.PodSucceeded, kapi.PodFailed: + // Before finalizing the status of the deployment, let any post action + // execute. The post action can't cause the deployment to be failed at + // this point. + postStatus, err := c.lifecycleManager.Status(lifecycle.PostDeploymentContext, deployment) + if err != nil { + return fmt.Errorf("couldn't determine post hook status for %s: %s", labelForDeployment(deployment), err) + } + switch postStatus { + case deployapi.DeploymentLifecycleStatusPending: + err := c.lifecycleManager.Execute(lifecycle.PostDeploymentContext, deployment) + if err != nil { + return fmt.Errorf("couldn't execute post hook for %s: %v", labelForDeployment(deployment), err) + } + // block the deployment + break statusSwitch + case deployapi.DeploymentLifecycleStatusRunning: + // block the deployment + break statusSwitch + case deployapi.DeploymentLifecycleStatusFailed, + deployapi.DeploymentLifecycleStatusComplete, + deployapi.DeploymentLifecycleStatusCompleteWithErrors: + // continue the deployment + } + nextStatus = deployapi.DeploymentStatusComplete // Detect failure based on the container state for _, info := range pod.Status.ContainerStatuses { diff --git a/pkg/deploy/controller/deployerpod/factory.go b/pkg/deploy/controller/deployerpod/factory.go index cf5715b6667c..3a0c4ba2ca66 100644 --- a/pkg/deploy/controller/deployerpod/factory.go +++ b/pkg/deploy/controller/deployerpod/factory.go @@ -15,6 +15,8 @@ import ( controller "github.com/openshift/origin/pkg/controller" deployapi "github.com/openshift/origin/pkg/deploy/api" + lifecycle "github.com/openshift/origin/pkg/deploy/controller/lifecycle" + execnewpod "github.com/openshift/origin/pkg/deploy/controller/lifecycle/execnewpod" deployutil "github.com/openshift/origin/pkg/deploy/util" ) @@ -24,6 +26,8 @@ import ( type DeployerPodControllerFactory struct { // KubeClient is a Kubernetes client. KubeClient kclient.Interface + // Codec is used for encoding/decoding. + Codec runtime.Codec } // Create creates a DeployerPodController. @@ -56,6 +60,21 @@ func (factory *DeployerPodControllerFactory) Create() controller.RunnableControl } cache.NewPoller(pollFunc, 10*time.Second, podQueue).Run() + lifecycleManager := &lifecycle.LifecycleManager{ + Plugins: []lifecycle.Plugin{ + &execnewpod.Plugin{ + PodClient: &execnewpod.PodClientImpl{ + CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + return factory.KubeClient.Pods(namespace).Create(pod) + }, + }, + }, + }, + DecodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, factory.Codec) + }, + } + podController := &DeployerPodController{ deploymentClient: &deploymentClientImpl{ getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { @@ -65,6 +84,7 @@ func (factory *DeployerPodControllerFactory) Create() controller.RunnableControl return factory.KubeClient.ReplicationControllers(namespace).Update(deployment) }, }, + lifecycleManager: lifecycleManager, } return &controller.RetryController{ diff --git a/pkg/deploy/controller/deployment/controller.go b/pkg/deploy/controller/deployment/controller.go index f36cf09e6938..8eca4ec441f0 100644 --- a/pkg/deploy/controller/deployment/controller.go +++ b/pkg/deploy/controller/deployment/controller.go @@ -9,6 +9,7 @@ import ( kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" deployapi "github.com/openshift/origin/pkg/deploy/api" + lifecycle "github.com/openshift/origin/pkg/deploy/controller/lifecycle" deployutil "github.com/openshift/origin/pkg/deploy/util" ) @@ -32,6 +33,8 @@ type DeploymentController struct { makeContainer func(strategy *deployapi.DeploymentStrategy) (*kapi.Container, error) // decodeConfig knows how to decode the deploymentConfig from a deployment's annotations. decodeConfig func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) + // todo + lifecycleManager lifecycle.Interface } // fatalError is an error which can't be retried. @@ -45,8 +48,37 @@ func (c *DeploymentController) Handle(deployment *kapi.ReplicationController) er currentStatus := statusFor(deployment) nextStatus := currentStatus +statusSwitch: switch currentStatus { case deployapi.DeploymentStatusNew: + // Before kicking off the strategy, handle any pre action and ensure the + // action has reached a terminal status. A failed pre action may cause the + // deployment to fail. + preStatus, err := c.lifecycleManager.Status(lifecycle.PreDeploymentContext, deployment) + if err != nil { + return fatalError(fmt.Sprintf("couldn't determine pre hook status for %s: %s", labelForDeployment(deployment), err)) + } + + switch preStatus { + case deployapi.DeploymentLifecycleStatusPending: + err := c.lifecycleManager.Execute(lifecycle.PreDeploymentContext, deployment) + if err != nil { + return fatalError(fmt.Sprintf("couldn't execute pre hook for %s: %v", labelForDeployment(deployment), err)) + } + // block the deployment + break statusSwitch + case deployapi.DeploymentLifecycleStatusRunning: + // block the deployment + break statusSwitch + case deployapi.DeploymentLifecycleStatusFailed: + // fail the deployment + nextStatus = deployapi.DeploymentStatusFailed + break statusSwitch + case deployapi.DeploymentLifecycleStatusComplete, + deployapi.DeploymentLifecycleStatusCompleteWithErrors: + // continue the deployment + } + podTemplate, err := c.makeDeployerPod(deployment) if err != nil { return fatalError(fmt.Sprintf("couldn't make deployer pod for %s: %v", labelForDeployment(deployment), err)) diff --git a/pkg/deploy/controller/deployment/factory.go b/pkg/deploy/controller/deployment/factory.go index a83f0d95df6a..da7dbd5af4dd 100644 --- a/pkg/deploy/controller/deployment/factory.go +++ b/pkg/deploy/controller/deployment/factory.go @@ -15,6 +15,8 @@ import ( controller "github.com/openshift/origin/pkg/controller" deployapi "github.com/openshift/origin/pkg/deploy/api" + lifecycle "github.com/openshift/origin/pkg/deploy/controller/lifecycle" + execnewpod "github.com/openshift/origin/pkg/deploy/controller/lifecycle/execnewpod" deployutil "github.com/openshift/origin/pkg/deploy/util" ) @@ -44,6 +46,21 @@ func (factory *DeploymentControllerFactory) Create() controller.RunnableControll deploymentQueue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) cache.NewReflector(deploymentLW, &kapi.ReplicationController{}, deploymentQueue, 2*time.Minute).Run() + lifecycleManager := &lifecycle.LifecycleManager{ + Plugins: []lifecycle.Plugin{ + &execnewpod.Plugin{ + PodClient: &execnewpod.PodClientImpl{ + CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + return factory.KubeClient.Pods(namespace).Create(pod) + }, + }, + }, + }, + DecodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { + return deployutil.DecodeDeploymentConfig(deployment, factory.Codec) + }, + } + deployController := &DeploymentController{ deploymentClient: &deploymentClientImpl{ getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { @@ -67,6 +84,7 @@ func (factory *DeploymentControllerFactory) Create() controller.RunnableControll decodeConfig: func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) { return deployutil.DecodeDeploymentConfig(deployment, factory.Codec) }, + lifecycleManager: lifecycleManager, } return &controller.RetryController{ diff --git a/pkg/deploy/controller/lifecycle/execnewpod/controller.go b/pkg/deploy/controller/lifecycle/execnewpod/controller.go new file mode 100644 index 000000000000..9cb150169d38 --- /dev/null +++ b/pkg/deploy/controller/lifecycle/execnewpod/controller.go @@ -0,0 +1,95 @@ +package execnewpod + +import ( + "fmt" + + "github.com/golang/glog" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + + deployapi "github.com/openshift/origin/pkg/deploy/api" +) + +// ExecNewPodController keeps an ExecNewPodAction pod's status in sync with +// its associated deployment. +// +// Use ExecNewPodControllerControllerFactory to create this controller. +type ExecNewPodController struct { + // deploymentClient provides access to deployments. + deploymentClient deploymentClient +} + +// Handle syncs the action pod's status with its associated deployment. +func (c *ExecNewPodController) Handle(pod *kapi.Pod) error { + // Verify the assumption that we'll be given only pods correlated to a deployment + deploymentName, hasDeploymentName := pod.Annotations[deployapi.DeploymentAnnotation] + if !hasDeploymentName { + glog.V(4).Infof("Ignoring pod %s; no deployment annotation found", pod.Name) + return nil + } + + deployment, deploymentErr := c.deploymentClient.getDeployment(pod.Namespace, deploymentName) + if deploymentErr != nil { + return fmt.Errorf("couldn't get deployment %s/%s associated with pod %s", pod.Namespace, deploymentName, pod.Name) + } + + // Decide if this pod actually represents a lifecycle action + prePodName, hasPrePodName := deployment.Annotations[deployapi.PreExecNewPodActionPodAnnotation] + postPodName, hasPostPodName := deployment.Annotations[deployapi.PostExecNewPodActionPodAnnotation] + + if !hasPrePodName || !hasPostPodName { + glog.V(4).Infof("Ignoring pod %s; no ExecNewPod annotations found on associated deployment %s", pod.Name, labelForDeployment(deployment)) + return nil + } + + if pod.Name != prePodName && pod.Name != postPodName { + glog.V(4).Infof("Ignoring pod %s; name doesn't match lifeycle annotations on associated deployment %s", pod.Name, labelForDeployment(deployment)) + return nil + } + + // Determine whether this is a pre or post action pod so we can update the + // right annotation on the deployment. + var phaseAnnotation string + if hasPrePodName && prePodName == pod.Name { + phaseAnnotation = deployapi.PreExecNewPodActionPodPhaseAnnotation + } else if hasPostPodName && postPodName == pod.Name { + phaseAnnotation = deployapi.PostExecNewPodActionPodPhaseAnnotation + } else { + glog.V(4).Infof("Ignoring pod %s; name doesn't match lifeycle annotations on associated deployment %s", pod.Name, labelForDeployment(deployment)) + return nil + } + + // Update the deployment to hold the latest status of the action pod. + currentPhase := deployment.Annotations[phaseAnnotation] + nextPhase := string(pod.Status.Phase) + + if currentPhase != nextPhase { + deployment.Annotations[phaseAnnotation] = nextPhase + if _, err := c.deploymentClient.updateDeployment(deployment.Namespace, deployment); err != nil { + return fmt.Errorf("couldn't update deployment %s annotation %s from %s to %S: %v", labelForDeployment(deployment), phaseAnnotation, currentPhase, nextPhase, err) + } + glog.V(2).Infof("Updated deployment %s annotation %s from %s to %s", labelForDeployment(deployment), phaseAnnotation, currentPhase, nextPhase) + } + + return nil +} + +// deploymentClient abstracts access to deployments. +type deploymentClient interface { + getDeployment(namespace, name string) (*kapi.ReplicationController, error) + updateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} + +// deploymentClientImpl is a pluggable deploymentControllerDeploymentClient. +type deploymentClientImpl struct { + getDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) + updateDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} + +func (i *deploymentClientImpl) getDeployment(namespace, name string) (*kapi.ReplicationController, error) { + return i.getDeploymentFunc(namespace, name) +} + +func (i *deploymentClientImpl) updateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return i.updateDeploymentFunc(namespace, deployment) +} diff --git a/pkg/deploy/controller/lifecycle/execnewpod/factory.go b/pkg/deploy/controller/lifecycle/execnewpod/factory.go new file mode 100644 index 000000000000..05319d00ce2b --- /dev/null +++ b/pkg/deploy/controller/lifecycle/execnewpod/factory.go @@ -0,0 +1,129 @@ +package execnewpod + +import ( + "time" + + "github.com/golang/glog" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + controller "github.com/openshift/origin/pkg/controller" + deployapi "github.com/openshift/origin/pkg/deploy/api" + deployutil "github.com/openshift/origin/pkg/deploy/util" +) + +// DeployerPodControllerFactory can create a DeployerPodController which gets +// pods from a queue populated from a watch of all pods filtered by a cache of +// deployments associated with pods. +type ExecNewPodControllerFactory struct { + // KubeClient is a Kubernetes client. + KubeClient kclient.Interface +} + +// Create creates a DeployerPodController. +func (factory *ExecNewPodControllerFactory) Create() controller.RunnableController { + deploymentLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) + }, + } + deploymentStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + cache.NewReflector(deploymentLW, &kapi.ReplicationController{}, deploymentStore, 2*time.Minute).Run() + + // Kubernetes does not currently synchronize Pod status in storage with a Pod's container + // states. Because of this, we can't receive events related to container (and thus Pod) + // state changes, such as Running -> Terminated. As a workaround, populate the FIFO with + // a polling implementation which relies on client calls to list Pods - the Get/List + // REST implementations will populate the synchronized container/pod status on-demand. + // + // TODO: Find a way to get watch events for Pod/container status updates. The polling + // strategy is horribly inefficient and should be addressed upstream somehow. + podQueue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + pollFunc := func() (cache.Enumerator, error) { + return pollPods(deploymentStore, factory.KubeClient) + } + cache.NewPoller(pollFunc, 10*time.Second, podQueue).Run() + + podController := &ExecNewPodController{ + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Get(name) + }, + updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Update(deployment) + }, + }, + } + + return &controller.RetryController{ + Queue: podQueue, + RetryManager: controller.NewQueueRetryManager( + podQueue, + cache.MetaNamespaceKeyFunc, + func(obj interface{}, err error, count int) bool { return count < 1 }, + ), + Handle: func(obj interface{}) error { + pod := obj.(*kapi.Pod) + return podController.Handle(pod) + }, + } +} + +func pollPods(deploymentStore cache.Store, kClient kclient.PodsNamespacer) (cache.Enumerator, error) { + list := &kapi.PodList{} + + for _, obj := range deploymentStore.List() { + deployment := obj.(*kapi.ReplicationController) + status := deployapi.DeploymentStatus(deployment.Annotations[deployapi.DeploymentStatusAnnotation]) + switch status { + case deployapi.DeploymentStatusNew, deployapi.DeploymentStatusPending, deployapi.DeploymentStatusRunning: + // Only return lifecycle pods for the deployment + lifecyclePods := []string{} + if name, ok := deployment.Annotations[deployapi.PreExecNewPodActionPodAnnotation]; ok { + lifecyclePods = append(lifecyclePods, name) + } + + if name, ok := deployment.Annotations[deployapi.PostExecNewPodActionPodAnnotation]; ok { + lifecyclePods = append(lifecyclePods, name) + } + + for _, name := range lifecyclePods { + pod, err := kClient.Pods(deployment.Namespace).Get(name) + if err != nil { + glog.V(2).Infof("Couldn't find pod %s for deployment %s: %#v", name, deployment.Name, err) + continue + } + list.Items = append(list.Items, *pod) + } + } + } + + return &podEnumerator{list}, nil +} + +// podEnumerator allows a cache.Poller to enumerate items in an api.PodList +type podEnumerator struct { + *kapi.PodList +} + +// Len returns the number of items in the pod list. +func (pe *podEnumerator) Len() int { + if pe.PodList == nil { + return 0 + } + return len(pe.Items) +} + +// Get returns the item (and ID) with the particular index. +func (pe *podEnumerator) Get(index int) interface{} { + return &pe.Items[index] +} diff --git a/pkg/deploy/controller/lifecycle/execnewpod/plugin.go b/pkg/deploy/controller/lifecycle/execnewpod/plugin.go new file mode 100644 index 000000000000..a291b85429c8 --- /dev/null +++ b/pkg/deploy/controller/lifecycle/execnewpod/plugin.go @@ -0,0 +1,112 @@ +package execnewpod + +import ( + "fmt" + "strings" + + "github.com/golang/glog" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + + deployapi "github.com/openshift/origin/pkg/deploy/api" + lifecycle "github.com/openshift/origin/pkg/deploy/controller/lifecycle" +) + +// Plugin is a lifecycle Plugin which knows how to work with ExecNewPod handlers. +type Plugin struct { + // PodClient provides access to pods. + PodClient PodClient +} + +var _ = lifecycle.Plugin(&Plugin{}) + +func (p *Plugin) CanHandle(handler *deployapi.Handler) bool { + return handler.ExecNewPod != nil +} + +func (p *Plugin) Execute(context lifecycle.DeploymentContext, handler *deployapi.Handler, deployment *kapi.ReplicationController, config *deployapi.DeploymentConfig) error { + podSpec := &kapi.Pod{ + ObjectMeta: kapi.ObjectMeta{ + GenerateName: fmt.Sprintf("deployment-%s-lifecycle-%s-", deployment.Name, strings.ToLower(string(context))), + Annotations: map[string]string{ + deployapi.DeploymentAnnotation: deployment.Name, + }, + }, + Spec: kapi.PodSpec{ + Containers: []kapi.Container{ + { + Name: "lifecycle", + Image: "tianon/true", + }, + }, + // TODO: policy handling + RestartPolicy: kapi.RestartPolicyNever, + }, + } + + pod, err := p.PodClient.CreatePod(deployment.Namespace, podSpec) + if err != nil { + if !kerrors.IsAlreadyExists(err) { + return fmt.Errorf("couldn't create lifecycle pod for %s: %v", labelForDeployment(deployment), err) + } + } else { + glog.V(2).Infof("Created lifecycle pod %s for deployment %s", pod.Name, labelForDeployment(deployment)) + } + + return nil +} + +func (p *Plugin) Status(context lifecycle.DeploymentContext, handler *deployapi.Handler, deployment *kapi.ReplicationController) deployapi.DeploymentLifecycleStatus { + podAnnotation, phaseAnnotation := annotationsFor(context) + podName := deployment.Annotations[podAnnotation] + if len(podName) == 0 { + return deployapi.DeploymentLifecycleStatusPending + } + + phase := kapi.PodPhase(deployment.Annotations[phaseAnnotation]) + + var status deployapi.DeploymentLifecycleStatus + switch phase { + case kapi.PodPending, kapi.PodRunning, kapi.PodUnknown: + status = deployapi.DeploymentLifecycleStatusRunning + case kapi.PodSucceeded: + status = deployapi.DeploymentLifecycleStatusComplete + case kapi.PodFailed: + status = deployapi.DeploymentLifecycleStatusFailed + } + return status +} + +func annotationsFor(context lifecycle.DeploymentContext) (string, string) { + var podAnnotation string + var phaseAnnotation string + switch context { + case lifecycle.PreDeploymentContext: + podAnnotation = deployapi.PreExecNewPodActionPodAnnotation + phaseAnnotation = deployapi.PreExecNewPodActionPodPhaseAnnotation + case lifecycle.PostDeploymentContext: + podAnnotation = deployapi.PostExecNewPodActionPodAnnotation + phaseAnnotation = deployapi.PostExecNewPodActionPodPhaseAnnotation + } + return podAnnotation, phaseAnnotation +} + +// labelForDeployment builds a string identifier for a DeploymentConfig. +func labelForDeployment(deployment *kapi.ReplicationController) string { + return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) +} + +// PodClient abstracts access to pods. +type PodClient interface { + CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) +} + +// PodClientImpl is a pluggable PodClient. +type PodClientImpl struct { + CreatePodFunc func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) +} + +func (i *PodClientImpl) CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { + return i.CreatePodFunc(namespace, pod) +} diff --git a/pkg/deploy/controller/lifecycle/manager.go b/pkg/deploy/controller/lifecycle/manager.go new file mode 100644 index 000000000000..e4ef1f16aac7 --- /dev/null +++ b/pkg/deploy/controller/lifecycle/manager.go @@ -0,0 +1,151 @@ +package lifecycle + +import ( + "fmt" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + + deployapi "github.com/openshift/origin/pkg/deploy/api" +) + +// DeploymentContext informs the manager which Lifecycle point is being handled. +type DeploymentContext string + +const ( + // PreDeploymentContext refers to Lifecycle.Pre + PreDeploymentContext DeploymentContext = "PreDeploymentContext" + // PostDeploymentContext refers to Lifecycle.Post + PostDeploymentContext DeploymentContext = "PostDeploymentContext" +) + +// Interface provides deployment controllers with a way to execute and track +// lifecycle actions. +// +// This interface abstracts action policy handling; users should assume +// Complete, Failed, and CompleteWithErrors are terminal states, and that any +// request to retry has already been accounted for. Users should not attempt +// to retry actions with status Failed or CompleteWithErrors. +// +// Users should not be concerned with whether a given lifecycle action is +// actually defined on a deployment; calls to execute non-existent actions +// will no-op, and status for non-existent actions will appear to be Complete. +type Interface interface { + // Execute executes the deployment lifecycle action for the given context. + // If no action is defined, Execute should return nil. + Execute(context DeploymentContext, deployment *kapi.ReplicationController) error + // Status returns the status of the lifecycle action for the deployment. If + // no action is defined for the given context, Status returns Complete. If + // the action finished, one of the following will be returned: + // + // 1. Complete: If the action succeeded or doesn't exist + // 2. Failed: If the action failed and the policy is not configured to + // ignore failures + // 3. CompleteWithErrors: If the action failed and the policy is configured + // to ignore failures + // + // If the status couldn't be determined, an error is returned. + Status(context DeploymentContext, deployment *kapi.ReplicationController) (deployapi.DeploymentLifecycleStatus, error) +} + +// Plugin knows how to execute lifecycle handlers and report their status. +// +// Plugins are expected to report actual status, NOT policy based status. +type Plugin interface { + // CanHandle should return true if the plugin knows how to execute handler. + CanHandle(handler *deployapi.Handler) bool + // Execute executes handler in the given context for deployment. + Execute(context DeploymentContext, handler *deployapi.Handler, deployment *kapi.ReplicationController, config *deployapi.DeploymentConfig) error + // Status should report the actual status of the action without taking into + // account failure policies. + Status(context DeploymentContext, handler *deployapi.Handler, deployment *kapi.ReplicationController) deployapi.DeploymentLifecycleStatus +} + +// LifecycleManager implements a pluggable lifecycle.Interface which handles +// the high level details of lifecyle action execution such as decoding +// DeploymentConfigs and implementing the lifecycle.Interface contract for +// policy based status reporting using the actual status returned from +// plugins. +type LifecycleManager struct { + // Plugins execute specific handler instances. + Plugins []Plugin + // DecodeConfig knows how to decode the deploymentConfig from a deployment's annotations. + DecodeConfig func(deployment *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) +} + +var _ Interface = &LifecycleManager{} + +// Execute implements Interface. +func (m *LifecycleManager) Execute(context DeploymentContext, deployment *kapi.ReplicationController) error { + // Decode the config + config, err := m.DecodeConfig(deployment) + if err != nil { + return err + } + + // If there's no handler, no-op + handler := handlerFor(context, config) + if handler == nil { + return nil + } + + plugin, err := m.pluginFor(handler) + if err != nil { + return err + } + + return plugin.Execute(context, handler, deployment, config) +} + +// Status implements Interface. +func (m *LifecycleManager) Status(context DeploymentContext, deployment *kapi.ReplicationController) (deployapi.DeploymentLifecycleStatus, error) { + // Decode the config + config, err := m.DecodeConfig(deployment) + if err != nil { + return "", nil + } + + handler := handlerFor(context, config) + if handler == nil { + return deployapi.DeploymentLifecycleStatusComplete, nil + } + + plugin, err := m.pluginFor(handler) + if err != nil { + return "", err + } + + status := plugin.Status(context, handler, deployment) + if status == deployapi.DeploymentLifecycleStatusFailed && + handler.FailurePolicy == deployapi.HandlerFailurePolicyIgnore { + status = deployapi.DeploymentLifecycleStatusCompleteWithErrors + } + return status, nil +} + +// pluginFor finds a plugin which knows how to deal with handler. +func (m *LifecycleManager) pluginFor(handler *deployapi.Handler) (Plugin, error) { + for _, plugin := range m.Plugins { + if plugin.CanHandle(handler) { + return plugin, nil + } + } + + return nil, fmt.Errorf("no plugin registered for handler: %#v", handler) +} + +// handlerFor finds any handler in config for the given context. +func handlerFor(context DeploymentContext, config *deployapi.DeploymentConfig) *deployapi.Handler { + if config.Template.Strategy.Lifecycle == nil { + return nil + } + + // Find any right handler given the context + var handler *deployapi.Handler + switch context { + case PreDeploymentContext: + handler = config.Template.Strategy.Lifecycle.Pre + case PostDeploymentContext: + handler = config.Template.Strategy.Lifecycle.Post + } + return handler +}