Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions cmd/machine-config-daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func runStartCmd(cmd *cobra.Command, args []string) {
glog.Fatalf("unable to verify rootMount %s exists: %s", startOpts.rootMount, err)
}

stopCh := make(chan struct{})
defer close(stopCh)

ctx := common.CreateControllerContext(cb, stopCh, componentName)
// create the daemon instance. this also initializes kube client items
// which need to come from the container and not the chroot.
daemon, err := daemon.New(
Expand All @@ -77,6 +81,7 @@ func runStartCmd(cmd *cobra.Command, args []string) {
cb.MachineConfigClientOrDie(componentName),
cb.KubeClientOrDie(componentName),
daemon.NewFileSystemClient(),
ctx.KubeInformerFactory.Core().V1().Nodes(),
)
if err != nil {
glog.Fatalf("failed to initialize daemon: %v", err)
Expand All @@ -92,8 +97,17 @@ func runStartCmd(cmd *cobra.Command, args []string) {
glog.Fatalf("unable to change directory to /: %s", err)
}

stopCh := make(chan struct{})
defer close(stopCh)
glog.Info("Starting MachineConfigDaemon")
defer glog.Info("Shutting down MachineConfigDaemon")

err = daemon.CheckStateOnBoot(stopCh)
if err != nil {
glog.Fatalf("error checking initial state of node: %v", err)
}

ctx.KubeInformerFactory.Start(ctx.Stop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, there's a race condition here if the node annotation is modified after CheckStateOnBoot(), but before cache is synced and we can start receiving callbacks, right?

Here's an idea: we spawn another goroutine that just listens for notifications on a channel before doing triggerUpdate(). Then in handleUpdate we send a notification. But crucially, we also send a notification after WaitForCacheSync() is done if annotations don't match. That way updates are still always serialized, and we close the race condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I thought about this as well, but while testing this it seems that the notifications should still be there for when the informer actually starts listening (Maybe I'm mis-understanding, I used to have a addNode callback as well which triggers despite having happened before the informer starts listening)

Also, during testing, I found that the callback happens quite frequently with the node (once every couple seconds), so updates are generally caught eventually anyways (since we only care about differences in CC vs DC anyways).

close(ctx.KubeInformersStarted)

err = daemon.Run(stopCh)
if err != nil {
glog.Fatalf("failed to run: %v", err)
Expand Down
149 changes: 95 additions & 54 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"github.com/vincent-petithory/dataurl"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/api/core/v1"
coreinformersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
corelisterv1 "k8s.io/client-go/listers/core/v1"
)

// Daemon is the dispatch point for the functions of the agent on the
Expand Down Expand Up @@ -47,6 +51,11 @@ type Daemon struct {

// rootMount is the location for the MCD to chroot in
rootMount string

// nodeLister is used to watch for updates via the informer
nodeLister corelisterv1.NodeLister

nodeListerSynced cache.InformerSynced
}

const (
Expand All @@ -68,6 +77,7 @@ func New(
client mcfgclientset.Interface,
kubeClient kubernetes.Interface,
fileSystemClient FileSystemClient,
nodeInformer coreinformersv1.NodeInformer,
) (*Daemon, error) {
loginClient, err := login1.New()
if err != nil {
Expand All @@ -84,7 +94,7 @@ func New(
}
glog.Infof("Booted osImageURL: %s (%s)", osImageURL, osVersion)

return &Daemon{
dn := &Daemon{
name: nodeName,
OperatingSystem: operatingSystem,
NodeUpdaterClient: nodeUpdaterClient,
Expand All @@ -94,17 +104,43 @@ func New(
rootMount: rootMount,
fileSystemClient: fileSystemClient,
bootedOSImageURL: osImageURL,
}, nil
}

nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No update needed, but I'm curious: Was there a specific reason to do the informer setup after the Daemon creation? Or is it more so it reads better to you this way rather than:

return  &Daemon{
        ....
        nodeLister:       nodeInformer.Lister(),
        nodeListerSynced: nodeInformer.Informer().HasSynced,
}, nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that elsewhere we split off the Lister, etc. (e.g. https://github.com/openshift/machine-config-operator/blob/master/pkg/controller/node/node_controller.go#L105). I can change it to your suggested method if you'd like.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is fine. I was just curious 😄

UpdateFunc: dn.handleNodeUpdate,
})

dn.nodeLister = nodeInformer.Lister()
dn.nodeListerSynced = nodeInformer.Informer().HasSynced

return dn, nil
}

// Run watches the annotations on the machine until they indicate that we need
// an update. then it triggers an update of the machine. currently, the update
// function shouldn't return, and should just reboot the node, unless an error
// occurs, in which case it will return the error up the call stack.
// Run finishes informer setup and then blocks, and the informer will be
// responsible for triggering callbacks to handle updates. Successful
// updates shouldn't return, and should just reboot the node.
func (dn *Daemon) Run(stop <-chan struct{}) error {
glog.Info("Starting MachineConfigDaemon")
defer glog.Info("Shutting down MachineConfigDaemon")
if !cache.WaitForCacheSync(stop, dn.nodeListerSynced) {
glog.Error("Marking degraded due to: failure to sync caches")
return setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name)
}

<-stop

// Run() should block on the above <-stop until an updated is detected,
// which is handled by the callbacks.
return nil
}

// CheckStateOnBoot is responsible for checking whether the node has
// degraded, and if not, whether an update is required immediately.
// The flow goes something like this -
// 1. Sanity check if we're in a degraded state. If yes, handle appropriately.
// 2. we restarted for some reason. the happy path reason we restarted is
// because of a machine reboot. validate the current machine state is the
// desired machine state. if we aren't try updating again. if we are, update
// the current state annotation accordingly.
func (dn *Daemon) CheckStateOnBoot(stop <-chan struct{}) error {
// sanity check we're not already in a degraded state
if state, err := getNodeAnnotationExt(dn.kubeClient.CoreV1().Nodes(), dn.name, MachineConfigDaemonStateAnnotationKey, true); err != nil {
// try to set to degraded... because we failed to check if we're degraded
Expand All @@ -121,64 +157,69 @@ func (dn *Daemon) Run(stop <-chan struct{}) error {
}
}

if err := dn.process(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the run() and process() split here? For one thing it makes the error handling a bit less painful in all the calls (i.e. we're just marking degraded on error in one spot instead of on all the out paths).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thought was that process() was more of a loop processing commands, whereas now it just blocks at the <-stop, and callbacks happen instead. This is also somewhat consistent with how we have informers elsewhere in this repo. I have no strong feelings either way so I can change that if you feel that makes more sense,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO I think it's ok that Run and process are not split in this PR wince the work in #126 was going to have to change process and Run significantly anyway.

// validate machine state
isDesired, dcAnnotation, err := dn.isDesiredMachineState()
if err != nil {
glog.Errorf("Marking degraded due to: %v", err)
return setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name)
}

if isDesired {
// we got the machine state we wanted. set the update complete!
if err := dn.completeUpdate(dcAnnotation); err != nil {
glog.Errorf("Marking degraded due to: %v", err)
return setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name)
}
} else if err := dn.triggerUpdate(); err != nil {
return err
}

return nil
}

// process is the main loop that actually does all the work. the flow goes
// something like this -
// 1. we restarted for some reason. the happy path reason we restarted is
// because of a machine reboot. validate the current machine state is the
// desired machine state. if we aren't try updating again. if we are, update
// the current state annotation accordingly.
// 2. watch the desired config annotation, waiting for an update to be
// requested by the controller.
// 3. if an update is requested by the controller, we assume that that means
// something changed and apply the desired config no matter what.
// 4. the update function doesn't return right now, but at some point in the
// future if a reboot isn't required for an update it will. if it returns,
// validate the machine state and set the update to done.
//
// the only reason this function will return is if an error occurs. otherwise it
// will keep trying to update the machine until it reboots.
func (dn *Daemon) process() error {
for {
// validate machine state
isDesired, dcAnnotation, err := dn.isDesiredMachineState()
if err != nil {
return err
}
// handleNodeUpdate is the handler for informer callbacks detecting node
// changes. If an update is requested by the controller, we assume that
// that means something changed and apply the desired config no matter what.
// Also note that we only care about node updates, not creation or deletion.
func (dn *Daemon) handleNodeUpdate(old, cur interface{}) {
node := cur.(*corev1.Node)

// First check if the node that was updated is this daemon's node
if (node.Name != dn.name) {
// The node that was changed was not ours
return
}

if isDesired {
// we got the machine state we wanted. set the update complete!
if err := dn.completeUpdate(dcAnnotation); err != nil {
return err
}
// Then check we're not already in a degraded state.
if state, err := getNodeAnnotation(dn.kubeClient.CoreV1().Nodes(), dn.name, MachineConfigDaemonStateAnnotationKey); err != nil {
// try to set to degraded... because we failed to check if we're degraded
glog.Errorf("Marking degraded due to: %v", err)
setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd rather we stop watching and exit on error conditions in this callback function and instead only have the "sleep on Degraded" logic at the beginning of the daemon.

Copy link
Contributor Author

@yuqi-zhang yuqi-zhang Oct 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, should I maybe instead not start the informer if we are degraded? So we never even get to this place?

Copy link
Member

@jlebon jlebon Oct 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! (I thought that was already the case -- when are callbacks actually active? Is it at nodeInformer.Informer().AddEventHandler() time? In that case, yeah we should move that to after checking if the node is even degraded.)

But also in this function if the node is degraded, let's just stop watching for updates and exit?

return
} else if state == MachineConfigDaemonStateDegraded {
// Just return since we want to continue sleeping
return
}

// now wait until we need another one.
glog.V(2).Infof("Watching for node annotation updates on %q", dn.name)
if err := waitUntilUpdate(dn.kubeClient.CoreV1().Nodes(), dn.name); err != nil {
return err
}
}
// Detect if there is an update
if (node.Annotations[DesiredMachineConfigAnnotationKey] == node.Annotations[CurrentMachineConfigAnnotationKey]) {
// No actual update to the config
return
}

// either the machine state isn't what we wanted and we should try
// again, or the machine state is what we wanted, and now another update
// is was triggered.
if err := dn.triggerUpdate(); err != nil {
return err
// The desired machine config has changed, trigger update
if err := dn.triggerUpdate(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that there's nothing preventing this to run at the same time as the initial triggerUpdate()? Or do callbacks not actually occur until after cache.WaitForCacheSync()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I haven't seen this cause an issue/race condition (even when there are repeated failed updates so it should have triggered), but let me double check.

glog.Errorf("Marking degraded due to: %v", err)
if errSet := setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name); errSet != nil {
glog.Errorf("Futher error attempting to set the node to degraded: %v", errSet)
}

// we managed to update the machine without rebooting. in this case, we
// basically just restart the logic, but working under the assumption
// that everything is already initialized for us, so we just go to the
// top
glog.V(2).Infof("Successfully updated without reboot")
// reboot the node, which will catch the degraded state and sleep
dn.reboot()
}

// we managed to update the machine without rebooting. in this case,
// continue as usual waiting for the next update
glog.V(2).Infof("Successfully updated without reboot")
}

// completeUpdate does all the stuff required to finish an update. right now, it
Expand Down
52 changes: 0 additions & 52 deletions pkg/daemon/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
)
Expand All @@ -19,45 +17,6 @@ const (
InitialNodeAnnotationsFilePath = "/etc/machine-config-daemon/node-annotations.json"
)

// waitUntilUpdate blocks until the desiredConfig annotation doesn't match the
// currentConfig annotation, which indicates that there is an update available
// for the node.
func waitUntilUpdate(client corev1.NodeInterface, node string) error {
n, err := client.Get(node, metav1.GetOptions{})
if err != nil {
return err
}

watcher, err := client.Watch(metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", node).String(),
ResourceVersion: n.ResourceVersion,
})
if err != nil {
return fmt.Errorf("Failed to watch self node (%q): %v", node, err)
}

// make sure the condition isn't already true
dc, err := getNodeAnnotation(client, node, DesiredMachineConfigAnnotationKey)
if err != nil {
return err
}
cc, err := getNodeAnnotation(client, node, CurrentMachineConfigAnnotationKey)
if err != nil || cc == "" {
return err
}
if dc != cc {
return nil
}

// for now, we wait forever. that might not be the best long-term strategy.
_, err = watch.Until(0, watcher, updateWatcher)
if err != nil {
return fmt.Errorf("Failed to watch for update request: %v", err)
}

return nil
}

// setConfig sets the given annotation key, value pair.
func setNodeAnnotations(client corev1.NodeInterface, node string, m map[string]string) error {
return updateNodeRetry(client, node, func(node *v1.Node) {
Expand Down Expand Up @@ -120,17 +79,6 @@ func getNodeAnnotationExt(client corev1.NodeInterface, node string, k string, al
return v, nil
}

// updateWatcher is the handler for the watch event.
func updateWatcher(event watch.Event) (bool, error) {
switch event.Type {
case watch.Modified:
node := event.Object.(*v1.Node)
return node.Annotations[DesiredMachineConfigAnnotationKey] != node.Annotations[CurrentMachineConfigAnnotationKey], nil
}

return false, nil
}

// updateNodeRetry calls f to update a node object in Kubernetes.
// It will attempt to update the node by applying f to it up to DefaultBackoff
// number of times.
Expand Down