Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 38 additions & 79 deletions cmd/machine-config-daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,17 @@ package main

import (
"flag"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"syscall"

"github.com/golang/glog"
"github.com/openshift/machine-config-operator/internal/clients"
controllercommon "github.com/openshift/machine-config-operator/pkg/controller/common"
"github.com/openshift/machine-config-operator/pkg/daemon"
mcfgclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned"
"github.com/openshift/machine-config-operator/pkg/version"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
)

var (
Expand Down Expand Up @@ -50,15 +46,6 @@ func init() {
startCmd.PersistentFlags().StringVar(&startOpts.kubeletHealthzEndpoint, "kubelet-healthz-endpoint", "http://localhost:10248/healthz", "healthz endpoint to check health")
}

// getBootID loads the unique "boot id" which is generated by the Linux kernel.
func getBootID() (string, error) {
currentBootIDBytes, err := ioutil.ReadFile("/proc/sys/kernel/random/boot_id")
if err != nil {
return "", err
}
return strings.TrimSpace(string(currentBootIDBytes)), nil
}

// bindPodMounts ensures that the daemon can still see e.g. /run/secrets/kubernetes.io
// service account tokens after chrooting. This function must be called before chroot.
func bindPodMounts(rootMount string) error {
Expand Down Expand Up @@ -125,49 +112,17 @@ func runStartCmd(cmd *cobra.Command, args []string) {
nodeWriter := daemon.NewNodeWriter()
go nodeWriter.Run(stopCh)

cb, err := clients.NewBuilder(startOpts.kubeconfig)
if err != nil {
if startOpts.onceFrom != "" {
glog.Info("Cannot initialize ClientBuilder, likely in onceFrom mode with Ignition")
} else {
glog.Fatalf("Failed to initialize ClientBuilder: %v", err)
}
}

var kubeClient kubernetes.Interface
if cb != nil {
kubeClient, err = cb.KubeClient(componentName)
if err != nil {
glog.Info("Cannot initialize kubeClient, likely in onceFrom mode with Ignition")
}
}

var dn *daemon.Daemon

bootID, err := getBootID()
if err != nil {
glog.Fatalf("Cannot get boot ID: %v", err)
}

// If we are asked to run once and it's a valid file system path use
// the bare Daemon
if startOpts.onceFrom != "" {
var mcClient mcfgclientset.Interface
if cb != nil {
mcClient, err = cb.MachineConfigClient(componentName)
if err != nil {
glog.Info("Cannot initialize MC client, likely in onceFrom mode with Ignition")
}
}
dn, err = daemon.New(
startOpts.nodeName,
operatingSystem,
daemon.NewNodeUpdaterClient(),
bootID,
startOpts.onceFrom,
startOpts.skipReboot,
mcClient,
kubeClient,
nil,
nil,
startOpts.kubeletHealthzEnabled,
startOpts.kubeletHealthzEndpoint,
nodeWriter,
Expand All @@ -178,45 +133,49 @@ func runStartCmd(cmd *cobra.Command, args []string) {
glog.Fatalf("Failed to initialize single run daemon: %v", err)
}
// Else we use the cluster driven daemon
} else {
if kubeClient == nil {
panic("Running in cluster mode without a kubeClient")
}
ctx := controllercommon.CreateControllerContext(cb, stopCh, componentName)
// create the daemon instance. this also initializes kube client items
// which need to come from the container and not the chroot.
dn, err = daemon.NewClusterDrivenDaemon(
startOpts.nodeName,
operatingSystem,
daemon.NewNodeUpdaterClient(),
ctx.InformerFactory.Machineconfiguration().V1().MachineConfigs(),
kubeClient,
bootID,
startOpts.onceFrom,
startOpts.skipReboot,
ctx.KubeInformerFactory.Core().V1().Nodes(),
startOpts.kubeletHealthzEnabled,
startOpts.kubeletHealthzEndpoint,
nodeWriter,
exitCh,
stopCh,
)

err = dn.RunOnceFrom(startOpts.onceFrom, startOpts.skipReboot)
if err != nil {
glog.Fatalf("Failed to initialize daemon: %v", err)
glog.Fatalf("%v", err)
}
return
}

ctx.KubeInformerFactory.Start(stopCh)
ctx.InformerFactory.Start(stopCh)
close(ctx.InformersStarted)
cb, err := clients.NewBuilder(startOpts.kubeconfig)
if err != nil {
glog.Fatalf("Failed to initialize ClientBuilder: %v", err)
}

glog.Info("Starting MachineConfigDaemon")
defer glog.Info("Shutting down MachineConfigDaemon")
kubeClient, err := cb.KubeClient(componentName)
if err != nil {
glog.Fatalf("Cannot initialize kubeClient: %v", err)
}

ctx := controllercommon.CreateControllerContext(cb, stopCh, componentName)
// create the daemon instance. this also initializes kube client items
// which need to come from the container and not the chroot.
dn, err = daemon.NewClusterDrivenDaemon(
startOpts.nodeName,
operatingSystem,
daemon.NewNodeUpdaterClient(),
ctx.InformerFactory.Machineconfiguration().V1().MachineConfigs(),
kubeClient,
ctx.KubeInformerFactory.Core().V1().Nodes(),
startOpts.kubeletHealthzEnabled,
startOpts.kubeletHealthzEndpoint,
nodeWriter,
exitCh,
stopCh,
)
if err != nil {
glog.Fatalf("Failed to initialize daemon: %v", err)
}

signaled := make(chan struct{})
dn.InstallSignalHandler(signaled)
ctx.KubeInformerFactory.Start(stopCh)
ctx.InformerFactory.Start(stopCh)
close(ctx.InformersStarted)

if err := dn.Run(stopCh, signaled, exitCh); err != nil {
if err := dn.Run(stopCh, exitCh); err != nil {
controllercommon.WriteTerminationError(err)
}
}
46 changes: 27 additions & 19 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,21 @@ func rebootCommand(rationale string) *exec.Cmd {
"--description", fmt.Sprintf("machine-config-daemon: %s", rationale), "/bin/sh", "-c", "systemctl stop kubelet.service; reboot")
}

// getBootID loads the unique "boot id" which is generated by the Linux kernel.
func getBootID() (string, error) {
currentBootIDBytes, err := ioutil.ReadFile("/proc/sys/kernel/random/boot_id")
if err != nil {
return "", err
}
return strings.TrimSpace(string(currentBootIDBytes)), nil
}

// New sets up the systemd and kubernetes connections needed to update the
// machine.
func New(
nodeName,
operatingSystem string,
nodeUpdaterClient NodeUpdaterClient,
bootID,
onceFrom string,
skipReboot bool,
mcClient mcfgclientset.Interface,
kubeClient kubernetes.Interface,
kubeletHealthzEnabled bool,
Expand All @@ -202,14 +208,18 @@ func New(
glog.Infof("Booted osImageURL: %s (%s)", osImageURL, osVersion)
}

bootID, err := getBootID()
if err != nil {
return nil, errors.Wrapf(err, "failed to read boot ID")
}

dn := &Daemon{
booting: true,
name: nodeName,
OperatingSystem: operatingSystem,
NodeUpdaterClient: nodeUpdaterClient,
bootedOSImageURL: osImageURL,
bootID: bootID,
onceFrom: onceFrom,
skipReboot: skipReboot,
kubeletHealthzEnabled: kubeletHealthzEnabled,
kubeletHealthzEndpoint: kubeletHealthzEndpoint,
nodeWriter: nodeWriter,
Expand All @@ -232,9 +242,6 @@ func NewClusterDrivenDaemon(
nodeUpdaterClient NodeUpdaterClient,
mcInformer mcfginformersv1.MachineConfigInformer,
kubeClient kubernetes.Interface,
bootID,
onceFrom string,
skipReboot bool,
nodeInformer coreinformersv1.NodeInformer,
kubeletHealthzEnabled bool,
kubeletHealthzEndpoint string,
Expand All @@ -246,9 +253,6 @@ func NewClusterDrivenDaemon(
nodeName,
operatingSystem,
nodeUpdaterClient,
bootID,
onceFrom,
skipReboot,
nil,
kubeClient,
kubeletHealthzEnabled,
Expand Down Expand Up @@ -285,7 +289,6 @@ func NewClusterDrivenDaemon(

dn.enqueueNode = dn.enqueueDefault
dn.syncHandler = dn.syncNode
dn.booting = true

return dn, nil
}
Expand Down Expand Up @@ -440,7 +443,11 @@ func (dn *Daemon) detectEarlySSHAccessesFromBoot() error {
return nil
}

func (dn *Daemon) runOnceFrom() error {
// RunOnceFrom is the primary entrypoint for the non-cluster case
func (dn *Daemon) RunOnceFrom(onceFrom string, skipReboot bool) error {
dn.onceFrom = onceFrom
dn.skipReboot = skipReboot

configi, contentFrom, err := dn.senseAndLoadOnceFrom()
if err != nil {
glog.Warningf("Unable to decipher onceFrom config type: %s", err)
Expand Down Expand Up @@ -486,20 +493,21 @@ func (dn *Daemon) InstallSignalHandler(signaled chan struct{}) {
// Run finishes informer setup and then blocks, and the informer will be
// responsible for triggering callbacks to handle updates. Successful
// updates shouldn't return, and should just reboot the node.
func (dn *Daemon) Run(stopCh, signaled <-chan struct{}, exitCh <-chan error) error {
func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
dn.logSystem("Starting to manage node: %s", dn.name)
dn.LogSystemData()

glog.Info("Starting MachineConfigDaemon")
defer glog.Info("Shutting down MachineConfigDaemon")

signaled := make(chan struct{})
dn.InstallSignalHandler(signaled)

if dn.kubeletHealthzEnabled {
glog.Info("Enabling Kubelet Healthz Monitor")
go dn.runKubeletHealthzMonitor(stopCh, dn.exitCh)
}

// Catch quickly if we've been asked to run once.
if dn.onceFrom != "" {
return dn.runOnceFrom()
}

defer utilruntime.HandleCrash()
defer dn.queue.ShutDown()

Expand Down
3 changes: 0 additions & 3 deletions pkg/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,6 @@ func (f *fixture) newController() *Daemon {
NewNodeUpdaterClient(),
i.Machineconfiguration().V1().MachineConfigs(),
f.kubeclient,
"",
"",
false,
k8sI.Core().V1().Nodes(),
false,
"",
Expand Down