diff --git a/cmd/machine-config-daemon/start.go b/cmd/machine-config-daemon/start.go index eca61db35f..e98a58efdb 100644 --- a/cmd/machine-config-daemon/start.go +++ b/cmd/machine-config-daemon/start.go @@ -67,6 +67,10 @@ func runStartCmd(cmd *cobra.Command, args []string) { glog.Fatalf("unable to verify rootMount %s exists: %s", startOpts.rootMount, err) } + stopCh := make(chan struct{}) + defer close(stopCh) + + ctx := common.CreateControllerContext(cb, stopCh, componentName) // create the daemon instance. this also initializes kube client items // which need to come from the container and not the chroot. daemon, err := daemon.New( @@ -77,6 +81,7 @@ func runStartCmd(cmd *cobra.Command, args []string) { cb.MachineConfigClientOrDie(componentName), cb.KubeClientOrDie(componentName), daemon.NewFileSystemClient(), + ctx.KubeInformerFactory.Core().V1().Nodes(), ) if err != nil { glog.Fatalf("failed to initialize daemon: %v", err) @@ -92,8 +97,17 @@ func runStartCmd(cmd *cobra.Command, args []string) { glog.Fatalf("unable to change directory to /: %s", err) } - stopCh := make(chan struct{}) - defer close(stopCh) + glog.Info("Starting MachineConfigDaemon") + defer glog.Info("Shutting down MachineConfigDaemon") + + err = daemon.CheckStateOnBoot(stopCh) + if err != nil { + glog.Fatalf("error checking initial state of node: %v", err) + } + + ctx.KubeInformerFactory.Start(ctx.Stop) + close(ctx.KubeInformersStarted) + err = daemon.Run(stopCh) if err != nil { glog.Fatalf("failed to run: %v", err) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 4781cc6c0c..c9af83852b 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -17,6 +17,10 @@ import ( "github.com/vincent-petithory/dataurl" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + corev1 "k8s.io/api/core/v1" + coreinformersv1 "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/tools/cache" + corelisterv1 "k8s.io/client-go/listers/core/v1" ) // Daemon is the dispatch point for the functions of the agent on the @@ -47,6 +51,11 @@ type Daemon struct { // rootMount is the location for the MCD to chroot in rootMount string + + // nodeLister is used to watch for updates via the informer + nodeLister corelisterv1.NodeLister + + nodeListerSynced cache.InformerSynced } const ( @@ -68,6 +77,7 @@ func New( client mcfgclientset.Interface, kubeClient kubernetes.Interface, fileSystemClient FileSystemClient, + nodeInformer coreinformersv1.NodeInformer, ) (*Daemon, error) { loginClient, err := login1.New() if err != nil { @@ -84,7 +94,7 @@ func New( } glog.Infof("Booted osImageURL: %s (%s)", osImageURL, osVersion) - return &Daemon{ + dn := &Daemon{ name: nodeName, OperatingSystem: operatingSystem, NodeUpdaterClient: nodeUpdaterClient, @@ -94,17 +104,43 @@ func New( rootMount: rootMount, fileSystemClient: fileSystemClient, bootedOSImageURL: osImageURL, - }, nil + } + + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: dn.handleNodeUpdate, + }) + + dn.nodeLister = nodeInformer.Lister() + dn.nodeListerSynced = nodeInformer.Informer().HasSynced + + return dn, nil } -// Run watches the annotations on the machine until they indicate that we need -// an update. then it triggers an update of the machine. currently, the update -// function shouldn't return, and should just reboot the node, unless an error -// occurs, in which case it will return the error up the call stack. +// Run finishes informer setup and then blocks, and the informer will be +// responsible for triggering callbacks to handle updates. Successful +// updates shouldn't return, and should just reboot the node. func (dn *Daemon) Run(stop <-chan struct{}) error { - glog.Info("Starting MachineConfigDaemon") - defer glog.Info("Shutting down MachineConfigDaemon") + if !cache.WaitForCacheSync(stop, dn.nodeListerSynced) { + glog.Error("Marking degraded due to: failure to sync caches") + return setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) + } + <-stop + + // Run() should block on the above <-stop until an updated is detected, + // which is handled by the callbacks. + return nil +} + +// CheckStateOnBoot is responsible for checking whether the node has +// degraded, and if not, whether an update is required immediately. +// The flow goes something like this - +// 1. Sanity check if we're in a degraded state. If yes, handle appropriately. +// 2. we restarted for some reason. the happy path reason we restarted is +// because of a machine reboot. validate the current machine state is the +// desired machine state. if we aren't try updating again. if we are, update +// the current state annotation accordingly. +func (dn *Daemon) CheckStateOnBoot(stop <-chan struct{}) error { // sanity check we're not already in a degraded state if state, err := getNodeAnnotationExt(dn.kubeClient.CoreV1().Nodes(), dn.name, MachineConfigDaemonStateAnnotationKey, true); err != nil { // try to set to degraded... because we failed to check if we're degraded @@ -121,64 +157,69 @@ func (dn *Daemon) Run(stop <-chan struct{}) error { } } - if err := dn.process(); err != nil { + // validate machine state + isDesired, dcAnnotation, err := dn.isDesiredMachineState() + if err != nil { glog.Errorf("Marking degraded due to: %v", err) return setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) } + if isDesired { + // we got the machine state we wanted. set the update complete! + if err := dn.completeUpdate(dcAnnotation); err != nil { + glog.Errorf("Marking degraded due to: %v", err) + return setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) + } + } else if err := dn.triggerUpdate(); err != nil { + return err + } + return nil } -// process is the main loop that actually does all the work. the flow goes -// something like this - -// 1. we restarted for some reason. the happy path reason we restarted is -// because of a machine reboot. validate the current machine state is the -// desired machine state. if we aren't try updating again. if we are, update -// the current state annotation accordingly. -// 2. watch the desired config annotation, waiting for an update to be -// requested by the controller. -// 3. if an update is requested by the controller, we assume that that means -// something changed and apply the desired config no matter what. -// 4. the update function doesn't return right now, but at some point in the -// future if a reboot isn't required for an update it will. if it returns, -// validate the machine state and set the update to done. -// -// the only reason this function will return is if an error occurs. otherwise it -// will keep trying to update the machine until it reboots. -func (dn *Daemon) process() error { - for { - // validate machine state - isDesired, dcAnnotation, err := dn.isDesiredMachineState() - if err != nil { - return err - } +// handleNodeUpdate is the handler for informer callbacks detecting node +// changes. If an update is requested by the controller, we assume that +// that means something changed and apply the desired config no matter what. +// Also note that we only care about node updates, not creation or deletion. +func (dn *Daemon) handleNodeUpdate(old, cur interface{}) { + node := cur.(*corev1.Node) + + // First check if the node that was updated is this daemon's node + if (node.Name != dn.name) { + // The node that was changed was not ours + return + } - if isDesired { - // we got the machine state we wanted. set the update complete! - if err := dn.completeUpdate(dcAnnotation); err != nil { - return err - } + // Then check we're not already in a degraded state. + if state, err := getNodeAnnotation(dn.kubeClient.CoreV1().Nodes(), dn.name, MachineConfigDaemonStateAnnotationKey); err != nil { + // try to set to degraded... because we failed to check if we're degraded + glog.Errorf("Marking degraded due to: %v", err) + setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) + return + } else if state == MachineConfigDaemonStateDegraded { + // Just return since we want to continue sleeping + return + } - // now wait until we need another one. - glog.V(2).Infof("Watching for node annotation updates on %q", dn.name) - if err := waitUntilUpdate(dn.kubeClient.CoreV1().Nodes(), dn.name); err != nil { - return err - } - } + // Detect if there is an update + if (node.Annotations[DesiredMachineConfigAnnotationKey] == node.Annotations[CurrentMachineConfigAnnotationKey]) { + // No actual update to the config + return + } - // either the machine state isn't what we wanted and we should try - // again, or the machine state is what we wanted, and now another update - // is was triggered. - if err := dn.triggerUpdate(); err != nil { - return err + // The desired machine config has changed, trigger update + if err := dn.triggerUpdate(); err != nil { + glog.Errorf("Marking degraded due to: %v", err) + if errSet := setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name); errSet != nil { + glog.Errorf("Futher error attempting to set the node to degraded: %v", errSet) } - - // we managed to update the machine without rebooting. in this case, we - // basically just restart the logic, but working under the assumption - // that everything is already initialized for us, so we just go to the - // top - glog.V(2).Infof("Successfully updated without reboot") + // reboot the node, which will catch the degraded state and sleep + dn.reboot() } + + // we managed to update the machine without rebooting. in this case, + // continue as usual waiting for the next update + glog.V(2).Infof("Successfully updated without reboot") } // completeUpdate does all the stuff required to finish an update. right now, it diff --git a/pkg/daemon/node.go b/pkg/daemon/node.go index 5e4f592b12..a343dcd6bb 100644 --- a/pkg/daemon/node.go +++ b/pkg/daemon/node.go @@ -7,8 +7,6 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/watch" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/retry" ) @@ -19,45 +17,6 @@ const ( InitialNodeAnnotationsFilePath = "/etc/machine-config-daemon/node-annotations.json" ) -// waitUntilUpdate blocks until the desiredConfig annotation doesn't match the -// currentConfig annotation, which indicates that there is an update available -// for the node. -func waitUntilUpdate(client corev1.NodeInterface, node string) error { - n, err := client.Get(node, metav1.GetOptions{}) - if err != nil { - return err - } - - watcher, err := client.Watch(metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", node).String(), - ResourceVersion: n.ResourceVersion, - }) - if err != nil { - return fmt.Errorf("Failed to watch self node (%q): %v", node, err) - } - - // make sure the condition isn't already true - dc, err := getNodeAnnotation(client, node, DesiredMachineConfigAnnotationKey) - if err != nil { - return err - } - cc, err := getNodeAnnotation(client, node, CurrentMachineConfigAnnotationKey) - if err != nil || cc == "" { - return err - } - if dc != cc { - return nil - } - - // for now, we wait forever. that might not be the best long-term strategy. - _, err = watch.Until(0, watcher, updateWatcher) - if err != nil { - return fmt.Errorf("Failed to watch for update request: %v", err) - } - - return nil -} - // setConfig sets the given annotation key, value pair. func setNodeAnnotations(client corev1.NodeInterface, node string, m map[string]string) error { return updateNodeRetry(client, node, func(node *v1.Node) { @@ -120,17 +79,6 @@ func getNodeAnnotationExt(client corev1.NodeInterface, node string, k string, al return v, nil } -// updateWatcher is the handler for the watch event. -func updateWatcher(event watch.Event) (bool, error) { - switch event.Type { - case watch.Modified: - node := event.Object.(*v1.Node) - return node.Annotations[DesiredMachineConfigAnnotationKey] != node.Annotations[CurrentMachineConfigAnnotationKey], nil - } - - return false, nil -} - // updateNodeRetry calls f to update a node object in Kubernetes. // It will attempt to update the node by applying f to it up to DefaultBackoff // number of times.