diff --git a/cmd/machine-config-daemon/request_reboot.go b/cmd/machine-config-daemon/request_reboot.go new file mode 100644 index 0000000000..cc06a3df99 --- /dev/null +++ b/cmd/machine-config-daemon/request_reboot.go @@ -0,0 +1,41 @@ +package main + +import ( + "flag" + "fmt" + "os" + + daemon "github.com/openshift/machine-config-operator/pkg/daemon" + "github.com/spf13/cobra" + "github.com/spf13/pflag" +) + +var requestRebootCmd = &cobra.Command{ + Use: "request-reboot", + DisableFlagsInUseLine: true, + Short: "Request a reboot", + Args: cobra.ExactArgs(1), + Run: executeRequestReboot, +} + +// init executes upon import +func init() { + rootCmd.AddCommand(requestRebootCmd) + pflag.CommandLine.AddGoFlagSet(flag.CommandLine) +} + +func runRequestReboot(_ *cobra.Command, args []string) error { + flag.Set("logtostderr", "true") + flag.Parse() + + return daemon.RequestReboot(args[0]) +} + +// Execute runs the command +func executeRequestReboot(cmd *cobra.Command, args []string) { + err := runRequestReboot(cmd, args) + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } +} diff --git a/pkg/apis/machineconfiguration.openshift.io/v1/types.go b/pkg/apis/machineconfiguration.openshift.io/v1/types.go index 23ba89a1ff..3ce07b0a4b 100644 --- a/pkg/apis/machineconfiguration.openshift.io/v1/types.go +++ b/pkg/apis/machineconfiguration.openshift.io/v1/types.go @@ -256,6 +256,9 @@ type MachineConfigPoolStatus struct { // A node is marked degraded if applying a configuration failed.. DegradedMachineCount int32 `json:"degradedMachineCount"` + // RequestedRebootMachineCount is the number of machines which have a reboot requested annotation. + RequestedRebootMachineCount int32 `json:"requestedRebootMachineCount"` + // conditions represents the latest available observations of current state. // +optional Conditions []MachineConfigPoolCondition `json:"conditions"` diff --git a/pkg/controller/node/node_controller.go b/pkg/controller/node/node_controller.go index 8a5462148e..617655104a 100644 --- a/pkg/controller/node/node_controller.go +++ b/pkg/controller/node/node_controller.go @@ -379,6 +379,32 @@ func (ctrl *Controller) deleteMachineConfigPool(obj interface{}) { // TODO(abhinavdahiya): handle deletes. } +// reconcileRebootApproval removes the approval annotation if there's no request +func (ctrl *Controller) reconcileRebootApproval(poolName string, node *corev1.Node) (*corev1.Node, error) { + var retNode *corev1.Node + + // If there's a stray approval with no request, delete it + _, rebootRequested := node.Annotations[daemonconsts.MachineConfigDaemonRebootRequestedAnnotation] + _, rebootApproved := node.Annotations[daemonconsts.MachineConfigDaemonRebootApprovedAnnotation] + if !rebootRequested && rebootApproved { + glog.Infof("Pool %s: Removing stale reboot approval for node %s", poolName, node.Name) + var err error + retNode, err = internal.UpdateNodeRetry(ctrl.kubeClient.CoreV1().Nodes(), ctrl.nodeLister, node.Name, func(node *corev1.Node) { + _, rebootRequested := node.Annotations[daemonconsts.MachineConfigDaemonRebootRequestedAnnotation] + _, rebootApproved := node.Annotations[daemonconsts.MachineConfigDaemonRebootApprovedAnnotation] + if !rebootRequested && rebootApproved { + delete(node.Annotations, daemonconsts.MachineConfigDaemonRebootApprovedAnnotation) + } + }) + if err != nil { + return nil, goerrs.Wrapf(err, "updating reboot annotation") + } + return retNode, nil + } + + return nil, nil +} + func (ctrl *Controller) addNode(obj interface{}) { node := obj.(*corev1.Node) if node.DeletionTimestamp != nil { @@ -395,6 +421,9 @@ func (ctrl *Controller) addNode(obj interface{}) { return } glog.V(4).Infof("Node %s added", node.Name) + if _, err := ctrl.reconcileRebootApproval(pool.Name, node); err != nil { + glog.Errorf("%v", err) + } ctrl.enqueueMachineConfigPool(pool) } @@ -447,7 +476,25 @@ func (ctrl *Controller) updateNode(old, cur interface{}) { } } + _, oldRebootRequested := oldNode.Annotations[daemonconsts.MachineConfigDaemonRebootRequestedAnnotation] + _, newRebootRequested := curNode.Annotations[daemonconsts.MachineConfigDaemonRebootRequestedAnnotation] + if oldRebootRequested != newRebootRequested { + glog.Infof("Pool %s: node %s changed reboot request state: %v", pool.Name, curNode.Name, newRebootRequested) + changed = true + } + + newNode, err := ctrl.reconcileRebootApproval(pool.Name, curNode) + if err != nil { + glog.Errorf("%v", err) + return + } + if newNode != nil { + curNode = newNode + changed = true + } + if !changed { + glog.V(4).Infof("No relevant changes to pool %s node %s", pool.Name, curNode.Name) return } @@ -677,8 +724,11 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error { } candidates := getCandidateMachines(pool, nodes, maxunavail) + if len(candidates) == 0 { + glog.V(3).Infof("Pool %s: No candidates to update", pool.Name) + } for _, node := range candidates { - if err := ctrl.setDesiredMachineConfigAnnotation(node.Name, pool.Spec.Configuration.Name); err != nil { + if err := ctrl.updateCandidateNode(pool.Name, node, pool.Spec.Configuration.Name); err != nil { return err } } @@ -711,8 +761,16 @@ func (ctrl *Controller) getNodesForPool(pool *mcfgv1.MachineConfigPool) ([]*core return nodes, nil } -func (ctrl *Controller) setDesiredMachineConfigAnnotation(nodeName, currentConfig string) error { - glog.Infof("Setting node %s to desired config %s", nodeName, currentConfig) +func (ctrl *Controller) updateCandidateNode(poolName string, node *corev1.Node, currentConfig string) error { + nodeName := node.Name + _, rebootRequested := node.Annotations[daemonconsts.MachineConfigDaemonRebootRequestedAnnotation] + needsConfigChange := node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] != currentConfig + + if needsConfigChange { + glog.Infof("Pool %s: Setting node %s to desired config %s", poolName, nodeName, currentConfig) + } else if rebootRequested { + glog.Infof("Pool %s: Approving node %s reboot", poolName, nodeName) + } return clientretry.RetryOnConflict(nodeUpdateBackoff, func() error { oldNode, err := ctrl.kubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) if err != nil { @@ -722,16 +780,27 @@ func (ctrl *Controller) setDesiredMachineConfigAnnotation(nodeName, currentConfi if err != nil { return err } + _, rebootRequested := oldNode.Annotations[daemonconsts.MachineConfigDaemonRebootRequestedAnnotation] + _, rebootApproved := oldNode.Annotations[daemonconsts.MachineConfigDaemonRebootApprovedAnnotation] + needsRebootApproval := rebootRequested && !rebootApproved + needsConfigChange := oldNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] != currentConfig + if !needsRebootApproval && !needsConfigChange { + // The RebootRequested flag was most likely removed, nothing to do then. + return nil + } - newNode := oldNode.DeepCopy() - if newNode.Annotations == nil { - newNode.Annotations = map[string]string{} + curNode := oldNode.DeepCopy() + if curNode.Annotations == nil { + curNode.Annotations = map[string]string{} } - if newNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] == currentConfig { - return nil + if needsConfigChange { + curNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] = currentConfig } - newNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] = currentConfig - newData, err := json.Marshal(newNode) + if rebootRequested { + curNode.Annotations[daemonconsts.MachineConfigDaemonRebootApprovedAnnotation] = "" + } + + newData, err := json.Marshal(curNode) if err != nil { return err } @@ -751,20 +820,25 @@ func getCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*corev1. unavail := getUnavailableMachines(nodesInPool) // If we're at capacity, there's nothing to do. if len(unavail) >= maxUnavailable { + glog.V(3).Infof("Pool %s: No progress possible: unavail %v >= maxunavail %v", pool.Name, len(unavail), maxUnavailable) return nil } capacity := maxUnavailable - len(unavail) failingThisConfig := 0 - // We only look at nodes which aren't already targeting our desired config + // We only look at nodes which aren't already targeting our desired config, or + // are requesting a reboot. var nodes []*corev1.Node for _, node := range nodesInPool { - if node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] == targetConfig { + _, rebootRequested := node.Annotations[daemonconsts.MachineConfigDaemonRebootRequestedAnnotation] + _, rebootApproved := node.Annotations[daemonconsts.MachineConfigDaemonRebootApprovedAnnotation] + needsRebootApproval := rebootRequested && !rebootApproved + targetingThisConfig := node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] == targetConfig + if !needsRebootApproval && targetingThisConfig { if isNodeMCDFailing(node) { failingThisConfig++ } continue } - nodes = append(nodes, node) } @@ -772,9 +846,11 @@ func getCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*corev1. // availability - it might be a transient issue, and if the issue // clears we don't want multiple to update at once. if failingThisConfig >= capacity { + glog.V(3).Infof("Pool %s: No progress possible: failingThisConfig %v >= capacity %v", pool.Name, failingThisConfig, capacity) return nil } capacity -= failingThisConfig + glog.V(3).Infof("Pool %s: Capacity %v with %v nodes to progress", pool.Name, capacity, len(nodes)) if len(nodes) < capacity { return nodes diff --git a/pkg/controller/node/node_controller_test.go b/pkg/controller/node/node_controller_test.go index ef18f6ab8c..b740412629 100644 --- a/pkg/controller/node/node_controller_test.go +++ b/pkg/controller/node/node_controller_test.go @@ -176,7 +176,7 @@ func checkAction(expected, actual core.Action, t *testing.T) { e, _ := expected.(core.PatchAction) expPatch := e.GetPatch() patch := a.GetPatch() - assert.Equal(t, expPatch, patch) + assert.Equal(t, string(expPatch), string(patch)) } } @@ -608,13 +608,12 @@ func assertPatchesNode0ToV1(t *testing.T, actions []core.Action) { t.Fatal(actions) } - expected := []byte(`{"metadata":{"annotations":{"machineconfiguration.openshift.io/desiredConfig":"v1"}}}`) + expected := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"v1"}}}`, daemonconsts.DesiredMachineConfigAnnotationKey) actual := actions[1].(core.PatchAction).GetPatch() - assert.Equal(t, expected, actual) + assert.Equal(t, expected, string(actual)) } -func TestSetDesiredMachineConfigAnnotation(t *testing.T) { - +func TestUpdateCandidateNode(t *testing.T) { tests := []struct { node *corev1.Node extraannos map[string]string @@ -676,6 +675,24 @@ func TestSetDesiredMachineConfigAnnotation(t *testing.T) { t.Fatal(actions) } }, + }, { + node: newNode("node-0", "v1", "v1"), + extraannos: map[string]string{daemonconsts.MachineConfigDaemonRebootRequestedAnnotation: ""}, + verify: func(actions []core.Action, t *testing.T) { + if !assert.Equal(t, 2, len(actions)) { + t.Fatal("actions") + } + if !actions[0].Matches("get", "nodes") || actions[0].(core.GetAction).GetName() != "node-0" { + t.Fatal(actions) + } + if !actions[1].Matches("patch", "nodes") { + t.Fatal(actions) + } + + expected := fmt.Sprintf(`{"metadata":{"annotations":{"%s":""}}}`, daemonconsts.MachineConfigDaemonRebootApprovedAnnotation) + actual := actions[1].(core.PatchAction).GetPatch() + assert.Equal(t, expected, string(actual)) + }, }} for idx, test := range tests { @@ -694,7 +711,7 @@ func TestSetDesiredMachineConfigAnnotation(t *testing.T) { c := f.newController() - err := c.setDesiredMachineConfigAnnotation(test.node.Name, "v1") + err := c.updateCandidateNode("piscine", test.node, "v1") if !assert.Nil(t, err) { return } diff --git a/pkg/controller/node/status.go b/pkg/controller/node/status.go index 0bcd2bc948..393d33830c 100644 --- a/pkg/controller/node/status.go +++ b/pkg/controller/node/status.go @@ -19,6 +19,7 @@ func (ctrl *Controller) syncStatusOnly(pool *mcfgv1.MachineConfigPool) error { newStatus := calculateStatus(pool, nodes) if equality.Semantic.DeepEqual(pool.Status, newStatus) { + glog.V(4).Infof("Pool %s: No changes in status", pool.Name) return nil } @@ -50,13 +51,17 @@ func calculateStatus(pool *mcfgv1.MachineConfigPool, nodes []*corev1.Node) mcfgv } degradedMachineCount := int32(len(degradedMachines)) + requestingRebootMachines := getRequestingRebootMachines(nodes) + requestingRebootMachinesCount := int32(len(requestingRebootMachines)) + status := mcfgv1.MachineConfigPoolStatus{ - ObservedGeneration: pool.Generation, - MachineCount: machineCount, - UpdatedMachineCount: updatedMachineCount, - ReadyMachineCount: readyMachineCount, - UnavailableMachineCount: unavailableMachineCount, - DegradedMachineCount: degradedMachineCount, + ObservedGeneration: pool.Generation, + MachineCount: machineCount, + UpdatedMachineCount: updatedMachineCount, + ReadyMachineCount: readyMachineCount, + UnavailableMachineCount: unavailableMachineCount, + DegradedMachineCount: degradedMachineCount, + RequestedRebootMachineCount: requestingRebootMachinesCount, } status.Configuration = pool.Status.Configuration @@ -69,8 +74,9 @@ func calculateStatus(pool *mcfgv1.MachineConfigPool, nodes []*corev1.Node) mcfgv allUpdated := updatedMachineCount == machineCount && readyMachineCount == machineCount && unavailableMachineCount == 0 + rebootRequested := requestingRebootMachinesCount > 0 - if allUpdated { + if allUpdated && !rebootRequested { //TODO: update api to only have one condition regarding status of update. updatedMsg := fmt.Sprintf("All nodes are updated with %s", pool.Spec.Configuration.Name) supdated := mcfgv1.NewMachineConfigPoolCondition(mcfgv1.MachineConfigPoolUpdated, corev1.ConditionTrue, "", updatedMsg) @@ -85,7 +91,13 @@ func calculateStatus(pool *mcfgv1.MachineConfigPool, nodes []*corev1.Node) mcfgv } else { supdated := mcfgv1.NewMachineConfigPoolCondition(mcfgv1.MachineConfigPoolUpdated, corev1.ConditionFalse, "", "") mcfgv1.SetMachineConfigPoolCondition(&status, *supdated) - supdating := mcfgv1.NewMachineConfigPoolCondition(mcfgv1.MachineConfigPoolUpdating, corev1.ConditionTrue, "", fmt.Sprintf("All nodes are updating to %s", pool.Spec.Configuration.Name)) + var msg string + if !allUpdated { + msg = fmt.Sprintf("All nodes are updating to %s", pool.Spec.Configuration.Name) + } else { + msg = fmt.Sprintf("Nodes requesting reboot: %d", requestingRebootMachinesCount) + } + supdating := mcfgv1.NewMachineConfigPoolCondition(mcfgv1.MachineConfigPoolUpdating, corev1.ConditionTrue, "", msg) mcfgv1.SetMachineConfigPoolCondition(&status, *supdating) } @@ -228,15 +240,31 @@ func isNodeUnavailable(node *corev1.Node) bool { if !isNodeReady(node) { return true } - // Ready nodes are not unavailable + + // If a reboot is both requested and approved, then the MCD may act on it, + // independent of the config state. + _, rebootRequestedSet := node.Annotations[daemonconsts.MachineConfigDaemonRebootRequestedAnnotation] + _, rebootApprovedSet := node.Annotations[daemonconsts.MachineConfigDaemonRebootApprovedAnnotation] + rebootApproved := rebootRequestedSet && rebootApprovedSet + + // If the node is in a completed config state, it's unavailable + // iff reboot is approved. if isNodeDone(node) { + return rebootApproved + } + + // If it needs reboot approval, it's not unavailable. + if rebootRequestedSet && !rebootApprovedSet { return false } + // Now we know the node isn't ready - the current config must not // equal target. We want to further filter down on the MCD state. // If a MCD is in a terminal (failing) state then we can safely retarget it. - // to a different config. Or to say it another way, a node is unavailable - // if the MCD is working, or hasn't started work but the configs differ. + // to a different config. We ignore reboot approval - this is an assumption + // that config changes override reboot approval. Or in other words, + // if we have a config change *and* reboot is requested+approved, the MCD + // will keep trying to target that config rather than reboot. return !isNodeMCDFailing(node) } @@ -277,3 +305,20 @@ func getDegradedMachines(nodes []*corev1.Node) []*corev1.Node { } return degraded } + +// getRequestingRebootMachines filters the nodes to ones which have +// a reboot request annotation set. +func getRequestingRebootMachines(nodes []*corev1.Node) []*corev1.Node { + var ret []*corev1.Node + for _, node := range nodes { + if node.Annotations == nil { + continue + } + _, ok := node.Annotations[daemonconsts.MachineConfigDaemonRebootRequestedAnnotation] + if !ok { + continue + } + ret = append(ret, node) + } + return ret +} diff --git a/pkg/controller/node/status_test.go b/pkg/controller/node/status_test.go index 7062a12b72..d9ceb3a19c 100644 --- a/pkg/controller/node/status_test.go +++ b/pkg/controller/node/status_test.go @@ -57,12 +57,15 @@ func newNode(name string, currentConfig, desiredConfig string) *corev1.Node { var annos map[string]string if currentConfig != "" || desiredConfig != "" { var state string + annos = map[string]string{} if currentConfig == desiredConfig { state = daemonconsts.MachineConfigDaemonStateDone } else { state = daemonconsts.MachineConfigDaemonStateWorking + // For now the unit tests will skip the request/approval cycle + annos[daemonconsts.MachineConfigDaemonRebootRequestedAnnotation] = "" + annos[daemonconsts.MachineConfigDaemonRebootApprovedAnnotation] = "" } - annos = map[string]string{} annos[daemonconsts.CurrentMachineConfigAnnotationKey] = currentConfig annos[daemonconsts.DesiredMachineConfigAnnotationKey] = desiredConfig annos[daemonconsts.MachineConfigDaemonStateAnnotationKey] = state @@ -115,6 +118,11 @@ func newNodeWithReadyAndDaemonState(name string, currentConfig, desiredConfig st node.Annotations = map[string]string{} } node.Annotations[daemonconsts.MachineConfigDaemonStateAnnotationKey] = dstate + // We're skipping the request/approval in the unit tests for now + if currentConfig == desiredConfig && dstate == daemonconsts.MachineConfigDaemonStateDegraded { + delete(node.Annotations, daemonconsts.MachineConfigDaemonRebootRequestedAnnotation) + delete(node.Annotations, daemonconsts.MachineConfigDaemonRebootApprovedAnnotation) + } return node } diff --git a/pkg/daemon/constants/constants.go b/pkg/daemon/constants/constants.go index f25fd4f3c2..2e1781ee67 100644 --- a/pkg/daemon/constants/constants.go +++ b/pkg/daemon/constants/constants.go @@ -12,6 +12,10 @@ const ( CurrentMachineConfigAnnotationKey = "machineconfiguration.openshift.io/currentConfig" // DesiredMachineConfigAnnotationKey is used to specify the desired MachineConfig for a machine DesiredMachineConfigAnnotationKey = "machineconfiguration.openshift.io/desiredConfig" + // MachineConfigDaemonRebootRequestedAnnotation is the MCD signaling that a node wants to reboot + MachineConfigDaemonRebootRequestedAnnotation = "machineconfiguration.openshift.io/reboot-requested" + // MachineConfigDaemonRebootApprovedAnnotation is added by the MCC to signal approval to reboot + MachineConfigDaemonRebootApprovedAnnotation = "machineconfiguration.openshift.io/reboot-approved" // MachineConfigDaemonStateAnnotationKey is used to fetch the state of the daemon on the machine. MachineConfigDaemonStateAnnotationKey = "machineconfiguration.openshift.io/state" // MachineConfigDaemonStateWorking is set by daemon when it is applying an update. @@ -31,6 +35,10 @@ const ( // InitialNodeAnnotationsBakPath defines the path of InitialNodeAnnotationsFilePath when the initial bootstrap is done. We leave it around for debugging and reconciling. InitialNodeAnnotationsBakPath = "/etc/machine-config-daemon/node-annotation.json.bak" + // RequestRebootJournalMessageID is written by `machine-config-daemon request-reboot` into the journal. + // It was generated by `systemd-id128 new`. + RequestRebootJournalMessageID = "bebd48754ea1425ab458bb22693c696a" + // EtcPivotFile is used by the `pivot` command // For more information, see https://github.com/openshift/pivot/pull/25/commits/c77788a35d7ee4058d1410e89e6c7937bca89f6c#diff-04c6e90faac2675aa89e2176d2eec7d8R44 EtcPivotFile = "/etc/pivot/image-pullspec" diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 6c8efdabd1..bc88aba2ee 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -23,9 +23,11 @@ import ( igntypes "github.com/coreos/ignition/config/v2_2/types" "github.com/golang/glog" drain "github.com/openshift/kubernetes-drain" + "github.com/openshift/machine-config-operator/internal" "github.com/openshift/machine-config-operator/lib/resourceread" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" "github.com/openshift/machine-config-operator/pkg/daemon/constants" + "github.com/openshift/machine-config-operator/pkg/daemon/journal" mcfginformersv1 "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions/machineconfiguration.openshift.io/v1" mcfglistersv1 "github.com/openshift/machine-config-operator/pkg/generated/listers/machineconfiguration.openshift.io/v1" "github.com/pkg/errors" @@ -82,6 +84,11 @@ type Daemon struct { // skipReboot skips the reboot after a sync, only valid with onceFrom != "" skipReboot bool + // journalRebootRequested is set if we detected a journal message requesting a reboot + journalRebootRequested bool + // selfRebootRequested is set if we sent a journal message to ourself, used + // to avoid spamming the journal. + selfRebootRequested bool kubeletHealthzEnabled bool kubeletHealthzEndpoint string @@ -375,18 +382,105 @@ func (dn *Daemon) syncNode(key string) error { // currently we return immediately here, although // I think we should change this to continue. dn.booting = false + glog.Info("Completed MCD initialization") return nil } - // Pass to the shared update prep method + // Check for current != desired config *and* approval to reboot current, desired, err := dn.prepUpdateFromCluster() if err != nil { return errors.Wrapf(err, "prepping update") } + + // Sync the reboot request flag from the journal to the annotation first. + _, rebootRequested := dn.node.Annotations[constants.MachineConfigDaemonRebootRequestedAnnotation] + if dn.journalRebootRequested && !rebootRequested { + node, err := setNodeAnnotations(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.node.Name, map[string]string{constants.MachineConfigDaemonRebootRequestedAnnotation: ""}) + if err != nil { + return errors.Wrapf(err, "failed to add reboot requested") + } + dn.node = node + } + + // If we're not approved for reboot for some reason, we can't apply an update + _, rebootApproved := dn.node.Annotations[constants.MachineConfigDaemonRebootApprovedAnnotation] + if current != nil || desired != nil { + // We are instructed by the node controller to do an update. Let's + // first ensure we have the reboot request annotation set. In order + // to ensure that the code for *external* reboot requests which operates + // via the journal is tested, we use that same API to talk to ourself. + if !rebootRequested { + if !dn.selfRebootRequested { + glog.Info("Self-requesting reboot") + rationale := fmt.Sprintf("Updating to %s", desired.Name) + err := RequestReboot(rationale) + if err != nil { + return err + } + dn.selfRebootRequested = true + } else { + rebootRequests, err := journal.Query("_UID=0", fmt.Sprintf("MESSAGE_ID=%s", constants.RequestRebootJournalMessageID)) + if err != nil { + return err + } + if len(rebootRequests) > 0 { + msg := rebootRequests[0] + glog.Infof("Query detected reboot request: %s", msg.Message) + dn.journalRebootRequested = true + node, err := setNodeAnnotations(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.node.Name, map[string]string{constants.MachineConfigDaemonRebootRequestedAnnotation: ""}) + if err != nil { + return errors.Wrapf(err, "failed to add reboot requested") + } + dn.node = node + } else { + glog.Infof("Queued self-reboot request, but haven't received it yet") + } + } + // Note the early return here; we wait + // for the reboot request to propagate before starting an update. + return nil + } + if !rebootApproved { + + glog.Infof("Desired config is %s, but reboot is not approved", desired.Name) + return nil + } + // Start the update. if err := dn.triggerUpdateWithMachineConfig(current, desired); err != nil { return err } + } else if rebootRequested { + // In this case, we have a request to reboot outside of a config change. + _, rebootApproved := dn.node.Annotations[constants.MachineConfigDaemonRebootApprovedAnnotation] + if rebootApproved { + // If we have the annotations, but there's nothing in the journal, then we must have done + // the reboot. Remove both annotations. + if !dn.journalRebootRequested { + node, err := internal.UpdateNodeRetry(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.node.Name, func(node *corev1.Node) { + delete(node.Annotations, constants.MachineConfigDaemonRebootRequestedAnnotation) + delete(node.Annotations, constants.MachineConfigDaemonRebootApprovedAnnotation) + }) + if err != nil { + return errors.Wrapf(err, "unsetting reboot annotations") + } + // And uncordon + if err := drain.Uncordon(dn.kubeClient.CoreV1().Nodes(), node, nil); err != nil { + return errors.Wrapf(err, "uncordoning") + } + glog.Infof("Completed reboot") + dn.node = node + } else { + // A reboot is requested and approved, let's do it. + err := dn.drain() + if err != nil { + return errors.Wrapf(err, "draining failed") + } + return dn.reboot("rebooting due to annotation request") + } + } else { + glog.Info("Reboot is requested, but not approved yet") + } } glog.V(2).Infof("Node %s is already synced", node.Name) return nil @@ -488,6 +582,11 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error { go dn.runKubeletHealthzMonitor(stopCh, dn.exitCh) } + err := dn.runRebootRequestMonitor(stopCh, dn.exitCh) + if err != nil { + return errors.Wrapf(err, "failed to query journal for reboot requests") + } + defer utilruntime.HandleCrash() defer dn.queue.ShutDown() @@ -510,6 +609,27 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error { } } +// runLoginMonitor watches the journal for requests to reboot +func (dn *Daemon) runRebootRequestMonitor(stopCh <-chan struct{}, exitCh chan<- error) error { + rebootMessages, rebootChan, err := journal.QueryAndStream(stopCh, exitCh, "_UID=0", fmt.Sprintf("MESSAGE_ID=%s", constants.RequestRebootJournalMessageID)) + if err != nil { + return err + } + if len(rebootMessages) > 0 { + glog.Infof("Detected %d reboot requests, first: %s", len(rebootMessages), rebootMessages[len(rebootMessages)-1].Message) + dn.journalRebootRequested = true + } + go func() { + for { + msg := <-rebootChan + glog.Infof("Got reboot request: %s", msg.Message) + dn.journalRebootRequested = true + dn.enqueueNode(dn.node) + } + }() + return nil +} + func (dn *Daemon) runLoginMonitor(stopCh <-chan struct{}, exitCh chan<- error) { cmd := exec.Command("journalctl", "-b", "-f", "-o", "cat", "-u", logindUnit, "MESSAGE_ID="+sdMessageSessionStart) stdout, err := cmd.StdoutPipe() @@ -1286,6 +1406,21 @@ func ValidPath(path string) bool { return false } +// RequestReboot logs into the systemd journal a special reboot request +// that the MCD watches for. +func RequestReboot(rationale string) error { + rationale = fmt.Sprintf("mcd reboot request: %s", rationale) + var msg bytes.Buffer + msg.WriteString(fmt.Sprintf(`MESSAGE_ID=%s +MESSAGE=%s`, constants.RequestRebootJournalMessageID, rationale)) + logger := exec.Command("logger", "--journald") + logger.Stdin = &msg + if err := logger.Run(); err != nil { + return errors.Wrapf(err, "failed to run logger") + } + return nil +} + // senseAndLoadOnceFrom gets a hold of the content for supported onceFrom configurations, // parses to verify the type, and returns back the genericInterface, the type description, // if it was local or remote, and error. diff --git a/pkg/daemon/journal/journal.go b/pkg/daemon/journal/journal.go new file mode 100644 index 0000000000..8e93db3a9d --- /dev/null +++ b/pkg/daemon/journal/journal.go @@ -0,0 +1,97 @@ +package journal + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "os/exec" + + "github.com/golang/glog" + "github.com/pkg/errors" +) + +type Entry struct { + Message string `json:"MESSAGE"` + Cursor string `json:"__CURSOR"` +} + +func Query(args ...string) ([]Entry, error) { + baseArgs := []string{"-b", "-o", "json"} + // The reason we use journalctl as a subprocess is because currently the MCD + // container is RHEL7, but RHCOS is RHEL8, and we chroot into the host. + cmd := exec.Command("journalctl", append(baseArgs, args...)...) + initialStdout, err := cmd.Output() + if err != nil { + return nil, err + } + scanner := bufio.NewScanner(bytes.NewReader(initialStdout)) + // First, gather all matched messages. + retMessages := []Entry{} + for scanner.Scan() { + var msg Entry + err := json.Unmarshal(scanner.Bytes(), &msg) + if err != nil { + return nil, err + } + retMessages = append(retMessages, msg) + } + + return retMessages, nil +} + +// QueryAndStream runs journalctl as a subprocess, parsing its output and returning a stream of entries. +// The "-b" argument is always passed which limits to entries from the current boot. +func QueryAndStream(stopCh <-chan struct{}, errCh chan<- error, args ...string) ([]Entry, <-chan Entry, error) { + retMessages, err := Query(args...) + if err != nil { + return nil, nil, err + } + // The next invocation of journalctl will watch for further messages. + baseArgs := []string{"-b", "-f", "-o", "json"} + // If we found any matched messages, ensure we don't return duplicates + // by asking for messages after the cursor from the last message. + if len(retMessages) > 0 { + lastCursor := retMessages[len(retMessages)-1].Cursor + baseArgs = append(baseArgs, fmt.Sprintf("--after-cursor=%s", lastCursor)) + } + watchCmd := exec.Command("journalctl", append(baseArgs, args...)...) + stdout, err := watchCmd.StdoutPipe() + if err != nil { + return nil, nil, err + } + if err := watchCmd.Start(); err != nil { + return nil, nil, err + } + scanner := bufio.NewScanner(stdout) + retChannel := make(chan Entry) + go func() { + for { + select { + case <-stopCh: + glog.Info("got stop request, killing journal monitor process") + watchCmd.Process.Kill() + watchCmd.Wait() + break + } + } + }() + go func() { + for scanner.Scan() { + var msg Entry + if err := json.Unmarshal(scanner.Bytes(), &msg); err != nil { + err := errors.Wrapf(err, "parsing journal") + glog.Errorf("journal watcher: %v", err) + errCh <- err + break + } + retChannel <- msg + } + if err := scanner.Err(); err != nil { + err := errors.Wrapf(err, "reading journal") + glog.Errorf("journal watcher: %v", err) + errCh <- err + } + }() + return retMessages, retChannel, nil +} diff --git a/pkg/daemon/update.go b/pkg/daemon/update.go index c82193b175..bc167b9d02 100644 --- a/pkg/daemon/update.go +++ b/pkg/daemon/update.go @@ -102,6 +102,47 @@ func (dn *Daemon) updateOSAndReboot(newConfig *mcfgv1.MachineConfig) (retErr err return dn.drainAndReboot(newConfig) } +func (dn *Daemon) drain() error { + // Skip draining of the node when we're not cluster driven + if dn.kubeClient == nil { + return nil + } + + dn.logSystem("Update prepared; beginning drain") + + dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Drain", "Draining node to update config.") + + backoff := wait.Backoff{ + Steps: 5, + Duration: 10 * time.Second, + Factor: 2, + } + var lastErr error + if err := wait.ExponentialBackoff(backoff, func() (bool, error) { + err := drain.Drain(dn.kubeClient, []*corev1.Node{dn.node}, &drain.DrainOptions{ + DeleteLocalData: true, + Force: true, + GracePeriodSeconds: 600, + IgnoreDaemonsets: true, + Logger: &drainLogger{}, + }) + if err == nil { + return true, nil + } + lastErr = err + glog.Infof("Draining failed with: %v, retrying", err) + return false, nil + }); err != nil { + if err == wait.ErrWaitTimeout { + return errors.Wrapf(lastErr, "failed to drain node (%d tries): %v", backoff.Steps, err) + } + return errors.Wrap(err, "failed to drain node") + } + dn.logSystem("drain complete") + + return nil +} + func (dn *Daemon) drainAndReboot(newConfig *mcfgv1.MachineConfig) (retErr error) { if out, err := dn.storePendingState(newConfig, 1); err != nil { return errors.Wrapf(err, "failed to log pending config: %s", string(out)) @@ -121,39 +162,8 @@ func (dn *Daemon) drainAndReboot(newConfig *mcfgv1.MachineConfig) (retErr error) dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "PendingConfig", fmt.Sprintf("Written pending config %s", newConfig.GetName())) } - // Skip draining of the node when we're not cluster driven - if dn.kubeClient != nil { - dn.logSystem("Update prepared; beginning drain") - - dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Drain", "Draining node to update config.") - - backoff := wait.Backoff{ - Steps: 5, - Duration: 10 * time.Second, - Factor: 2, - } - var lastErr error - if err := wait.ExponentialBackoff(backoff, func() (bool, error) { - err := drain.Drain(dn.kubeClient, []*corev1.Node{dn.node}, &drain.DrainOptions{ - DeleteLocalData: true, - Force: true, - GracePeriodSeconds: 600, - IgnoreDaemonsets: true, - Logger: &drainLogger{}, - }) - if err == nil { - return true, nil - } - lastErr = err - glog.Infof("Draining failed with: %v, retrying", err) - return false, nil - }); err != nil { - if err == wait.ErrWaitTimeout { - return errors.Wrapf(lastErr, "failed to drain node (%d tries): %v", backoff.Steps, err) - } - return errors.Wrap(err, "failed to drain node") - } - dn.logSystem("drain complete") + if err := dn.drain(); err != nil { + return err } // reboot. this function shouldn't actually return. diff --git a/test/e2e/mcd_test.go b/test/e2e/mcd_test.go index eccb54f3ac..53dc8a9629 100644 --- a/test/e2e/mcd_test.go +++ b/test/e2e/mcd_test.go @@ -163,9 +163,28 @@ func TestMCDeployed(t *testing.T) { } } -func bumpPoolMaxUnavailableTo(t *testing.T, cs *framework.ClientSet, max int) { - pool, err := cs.MachineConfigPools().Get("worker", metav1.GetOptions{}) - require.Nil(t, err) +func setPoolPaused(cs *framework.ClientSet, pool *mcfgv1.MachineConfigPool, pause bool) error { + if pool.Spec.Paused == pause { + return fmt.Errorf("pool %s is already in paused state: %v", pool.Name, pause) + } + old, err := json.Marshal(pool) + if err != nil { + return err + } + pool.Spec.Paused = pause + new, err := json.Marshal(pool) + if err != nil { + return err + } + patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(old, new, old) + if err != nil { + return err + } + _, err = cs.MachineConfigPools().Patch("worker", types.MergePatchType, patch) + return err +} + +func setPoolMaxUnavailable(t *testing.T, cs *framework.ClientSet, pool *mcfgv1.MachineConfigPool, max int) { old, err := json.Marshal(pool) require.Nil(t, err) maxUnavailable := intstr.FromInt(max) @@ -178,6 +197,12 @@ func bumpPoolMaxUnavailableTo(t *testing.T, cs *framework.ClientSet, max int) { require.Nil(t, err) } +func bumpPoolMaxUnavailableTo(t *testing.T, cs *framework.ClientSet, max int) { + pool, err := cs.MachineConfigPools().Get("worker", metav1.GetOptions{}) + require.Nil(t, err) + setPoolMaxUnavailable(t, cs, pool, max) +} + func mcdForNode(cs *framework.ClientSet, node *corev1.Node) (*corev1.Pod, error) { // find the MCD pod that has spec.nodeNAME = node.Name and get its name: listOptions := metav1.ListOptions{ @@ -592,3 +617,90 @@ func TestFIPS(t *testing.T) { t.Logf("Node %s has expected FIPS mode", node.Name) } } + + +func TestRequestReboot(t *testing.T) { + cs := framework.NewClientSet("") + pool, err := cs.MachineConfigPools().Get("worker", metav1.GetOptions{}) + require.Nil(t, err) + // We must be starting from an updated pool + require.True(t, mcfgv1.IsMachineConfigPoolConditionTrue(pool.Status.Conditions, mcfgv1.MachineConfigPoolUpdated)) + maxUnavail := int(pool.Status.MachineCount - 1) + setPoolMaxUnavailable(t, cs, pool, maxUnavail) + nodes, err := getNodesByRole(cs, "worker") + require.Nil(t, err) + nodeBootIds := make(map[string]string) + // We pause the pool to ensure that we can watch for the reboot-requested annotations + // without races. + err = setPoolPaused(cs, pool, true) + require.Nil(t, err) + for _, node := range nodes { + mcd, err := mcdForNode(cs, &node) + require.Nil(t, err) + mcdName := mcd.ObjectMeta.Name + bootIdBuf, err := exec.Command("oc", "rsh", "-n", "openshift-machine-config-operator", mcdName, + "cat", "/proc/sys/kernel/random/boot_id").CombinedOutput() + require.Nil(t, err) + nodeBootIds[node.Name] = string(bootIdBuf) + // We do this hack since the MCD is part of machine-os-content which we aren't building/overriding in CI right now + cmd := "cp -p /usr/bin/machine-config-daemon /rootfs/tmp && chroot /rootfs /tmp/machine-config-daemon request-reboot test-request-reboot" + err = exec.Command("oc", "rsh", "-n", "openshift-machine-config-operator", mcdName, + "/bin/sh", "-c", cmd).Run() + require.Nil(t, err) + } + t.Log("requested reboot on all nodes") + + // Wait for the rebooting annotations to show up + startTime := time.Now() + err = wait.Poll(5*time.Second, 2*time.Minute, func() (bool, error) { + nodes, err := getNodesByRole(cs, "worker") + require.Nil(t, err) + haveRebootingAnnotation := make(map[string]bool) + for _, node := range nodes { + node, err := cs.Nodes().Get(node.Name, metav1.GetOptions{}) + require.Nil(t, err) + if _, ok := node.Annotations[constants.MachineConfigDaemonRebootRequestedAnnotation]; ok { + haveRebootingAnnotation[node.Name] = true + } + } + if len(haveRebootingAnnotation) == maxUnavail { + return true, nil + } + return false, nil + }) + require.Nil(t, err) + t.Log("all reboot annotations appeared") + + // Unpause the pool + err = setPoolPaused(cs, pool, false) + require.Nil(t, err) + + startTime = time.Now() + err = wait.Poll(2*time.Second, 10*time.Minute, func() (bool, error) { + pool, err := cs.MachineConfigPools().Get("worker", metav1.GetOptions{}) + require.Nil(t, err) + require.True(t, mcfgv1.IsMachineConfigPoolConditionTrue(pool.Status.Conditions, mcfgv1.MachineConfigPoolUpdated)) + curRequest := int(pool.Status.RequestedRebootMachineCount) + if curRequest > maxUnavail { + return false, fmt.Errorf("Requested reboot count is too high: %v", curRequest) + } + if curRequest == 0 { + return true, nil + } + return false, nil + }) + t.Log("pool reported all reboots complete") + + require.Nil(t, err) + for _, node := range nodes { + mcd, err := mcdForNode(cs, &node) + require.Nil(t, err) + mcdName := mcd.ObjectMeta.Name + bootIdBuf, err := exec.Command("oc", "rsh", "-n", "openshift-machine-config-operator", mcdName, + "cat", "/proc/sys/kernel/random/boot_id").CombinedOutput() + require.Nil(t, err) + bootId := string(bootIdBuf) + assert.NotEqual(t, nodeBootIds[node.Name], bootId) + } + t.Logf("Pool %s has completed rebooting (waited %v)", pool.Name, time.Since(startTime)) +}