diff --git a/Gopkg.lock b/Gopkg.lock index b2bd0daaee..09fae30a8e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -304,6 +304,14 @@ pruneopts = "NUT" revision = "24818f796faf91cd76ec7bddd72458fbced7a6c1" +[[projects]] + digest = "1:dcb52a1496f6b1ceeb0c123c0a557bc04bd8f17071f45bc5492aa88ec2638401" + name = "github.com/google/renameio" + packages = ["."] + pruneopts = "NUT" + revision = "f0e32980c006571efd537032e5f9cd8c1a92819e" + version = "v0.1.0" + [[projects]] digest = "1:1bb197a3b5db4e06e00b7560f8e89836c486627f2a0338332ed37daa003d259e" name = "github.com/google/uuid" @@ -1179,6 +1187,7 @@ "github.com/davecgh/go-spew/spew", "github.com/ghodss/yaml", "github.com/golang/glog", + "github.com/google/renameio", "github.com/imdario/mergo", "github.com/kubernetes-sigs/cri-o/pkg/config", "github.com/openshift/api/config/v1", diff --git a/Gopkg.toml b/Gopkg.toml index 9edf0a30a9..232c179328 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -47,6 +47,10 @@ required = [ name = "github.com/apparentlymart/go-cidr" version = "1.0.0" +[[constraint]] + name = "github.com/google/renameio" + version = "v0.1.0" + [[constraint]] name = "github.com/ashcrow/osrelease" version = "1.0.0" diff --git a/cmd/machine-config-daemon/start.go b/cmd/machine-config-daemon/start.go index 0f7ed3a461..37b1d195ff 100644 --- a/cmd/machine-config-daemon/start.go +++ b/cmd/machine-config-daemon/start.go @@ -9,7 +9,6 @@ import ( "github.com/openshift/machine-config-operator/cmd/common" "github.com/openshift/machine-config-operator/pkg/daemon" "github.com/openshift/machine-config-operator/pkg/version" - "github.com/pkg/errors" "github.com/spf13/cobra" ) @@ -97,7 +96,6 @@ func runStartCmd(cmd *cobra.Command, args []string) { startOpts.nodeName, operatingSystem, daemon.NewNodeUpdaterClient(), - daemon.NewFileSystemClient(), startOpts.onceFrom, startOpts.kubeletHealthzEnabled, startOpts.kubeletHealthzEndpoint, @@ -122,9 +120,8 @@ func runStartCmd(cmd *cobra.Command, args []string) { startOpts.nodeName, operatingSystem, daemon.NewNodeUpdaterClient(), - cb.MachineConfigClientOrDie(componentName), + ctx.InformerFactory.Machineconfiguration().V1().MachineConfigs(), cb.KubeClientOrDie(componentName), - daemon.NewFileSystemClient(), startOpts.onceFrom, ctx.KubeInformerFactory.Core().V1().Nodes(), startOpts.kubeletHealthzEnabled, @@ -154,19 +151,15 @@ func runStartCmd(cmd *cobra.Command, args []string) { } if startOpts.onceFrom == "" { - err = dn.CheckStateOnBoot() - if err != nil { - dn.EnterDegradedState(errors.Wrapf(err, "Checking initial state")) - } ctx.KubeInformerFactory.Start(stopCh) + ctx.InformerFactory.Start(stopCh) close(ctx.InformersStarted) } glog.Info("Starting MachineConfigDaemon") defer glog.Info("Shutting down MachineConfigDaemon") - err = dn.Run(stopCh, exitCh) - if err != nil { + if err := dn.Run(stopCh, exitCh); err != nil { glog.Fatalf("failed to run: %v", err) } } diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 57a710d7b4..f812fa8fa7 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -13,6 +13,7 @@ import ( "strings" "time" + imgref "github.com/containers/image/docker/reference" "github.com/coreos/go-systemd/login1" "github.com/coreos/go-systemd/sdjournal" ignv2 "github.com/coreos/ignition/config/v2_2" @@ -22,13 +23,13 @@ import ( "github.com/openshift/machine-config-operator/lib/resourceread" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" "github.com/openshift/machine-config-operator/pkg/daemon/constants" - mcfgclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" - mcfgclientv1 "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned/typed/machineconfiguration.openshift.io/v1" + mcfginformersv1 "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions/machineconfiguration.openshift.io/v1" + mcfglistersv1 "github.com/openshift/machine-config-operator/pkg/generated/listers/machineconfiguration.openshift.io/v1" "github.com/pkg/errors" "github.com/vincent-petithory/dataurl" - imgref "github.com/containers/image/docker/reference" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" coreinformersv1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -37,6 +38,7 @@ import ( corelisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" ) // Daemon is the dispatch point for the functions of the agent on the @@ -61,22 +63,21 @@ type Daemon struct { // machine loginClient *login1.Conn - client mcfgclientset.Interface // kubeClient allows interaction with Kubernetes, including the node we are running on. kubeClient kubernetes.Interface // recorder sends events to the apiserver recorder record.EventRecorder - // filesystemClient allows interaction with the local filesystm - fileSystemClient FileSystemClient - // rootMount is the location for the MCD to chroot in rootMount string // nodeLister is used to watch for updates via the informer - nodeLister corelisterv1.NodeLister - + nodeLister corelisterv1.NodeLister nodeListerSynced cache.InformerSynced + + mcLister mcfglistersv1.MachineConfigLister + mcListerSynced cache.InformerSynced + // onceFrom defines where the source config is to run the daemon once and exit onceFrom string @@ -96,6 +97,15 @@ type Daemon struct { // node is the current instance of the node being processed through handleNodeUpdate // or the very first instance grabbed when the daemon starts node *corev1.Node + + // remove the funcs below once proper e2e testing is done for updating ssh keys + atomicSSHKeysWriter func(ignv2_2types.PasswdUser, string) error + + queue workqueue.RateLimitingInterface + enqueueNode func(*corev1.Node) + syncHandler func(node string) error + + booting bool } // pendingConfigState is stored as JSON at pathStateJSON; it is only @@ -125,13 +135,23 @@ const ( machineConfigOnceFromRemoteConfig = "REMOTE" // machineConfigOnceFromLocalConfig denotes that the config was found locally machineConfigOnceFromLocalConfig = "LOCAL" -) -const ( kubeletHealthzEndpoint = "http://localhost:10248/healthz" kubeletHealthzPollingInterval = time.Duration(30 * time.Second) kubeletHealthzTimeout = time.Duration(30 * time.Second) kubeletHealthzFailureThreshold = 3 + + // TODO(runcom): increase retry and backoff? + // + // maxRetries is the number of times a node will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times + // a node is going to be requeued: + // + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 + + // updateDelay is a pause to deal with churn in Node + updateDelay = 5 * time.Second ) // getBootID loads the unique "boot id" which is generated by the Linux kernel. @@ -150,7 +170,6 @@ func New( nodeName string, operatingSystem string, nodeUpdaterClient NodeUpdaterClient, - fileSystemClient FileSystemClient, onceFrom string, kubeletHealthzEnabled bool, kubeletHealthzEndpoint string, @@ -186,7 +205,6 @@ func New( NodeUpdaterClient: nodeUpdaterClient, loginClient: loginClient, rootMount: rootMount, - fileSystemClient: fileSystemClient, bootID: bootID, bootedOSImageURL: osImageURL, onceFrom: onceFrom, @@ -196,6 +214,7 @@ func New( exitCh: exitCh, stopCh: stopCh, } + dn.atomicSSHKeysWriter = dn.atomicallyWriteSSHKey return dn, nil } @@ -207,9 +226,8 @@ func NewClusterDrivenDaemon( nodeName string, operatingSystem string, nodeUpdaterClient NodeUpdaterClient, - client mcfgclientset.Interface, + mcInformer mcfginformersv1.MachineConfigInformer, kubeClient kubernetes.Interface, - fileSystemClient FileSystemClient, onceFrom string, nodeInformer coreinformersv1.NodeInformer, kubeletHealthzEnabled bool, @@ -223,7 +241,6 @@ func NewClusterDrivenDaemon( nodeName, operatingSystem, nodeUpdaterClient, - fileSystemClient, onceFrom, kubeletHealthzEnabled, kubeletHealthzEndpoint, @@ -237,7 +254,7 @@ func NewClusterDrivenDaemon( } dn.kubeClient = kubeClient - dn.client = client + dn.queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machineconfigdaemon") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.V(2).Infof) @@ -246,13 +263,6 @@ func NewClusterDrivenDaemon( glog.Infof("Managing node: %s", nodeName) - if err := dn.setInitialNode(nodeName); err != nil { - return nil, err - } - if err = dn.loadNodeAnnotations(); err != nil { - return nil, err - } - go dn.runLoginMonitor(dn.stopCh, dn.exitCh) nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -260,10 +270,153 @@ func NewClusterDrivenDaemon( }) dn.nodeLister = nodeInformer.Lister() dn.nodeListerSynced = nodeInformer.Informer().HasSynced + dn.mcLister = mcInformer.Lister() + dn.mcListerSynced = mcInformer.Informer().HasSynced + + dn.enqueueNode = dn.enqueueDefault + dn.syncHandler = dn.syncNode + dn.booting = true return dn, nil } +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (dn *Daemon) worker() { + for dn.processNextWorkItem() { + } +} + +func (dn *Daemon) processNextWorkItem() bool { + if dn.booting { + // any error here in bootstrap will cause a retry + if err := dn.bootstrapNode(); err != nil { + glog.Warningf("Booting the MCD errored with %v", err) + } + return true + } + key, quit := dn.queue.Get() + if quit { + return false + } + defer dn.queue.Done(key) + + err := dn.syncHandler(key.(string)) + dn.handleErr(err, key) + + return true +} + +// bootstrapNode takes care of the very first sync of the MCD on a node. +// It loads the node annotation from the bootstrap (if we're really bootstrapping) +// and then proceed to checking the state of the node, which includes +// finalizing an update and/or reconciling the current and desired machine configs. +func (dn *Daemon) bootstrapNode() error { + node, err := dn.nodeLister.Get(dn.name) + if err != nil { + if apierrors.IsNotFound(err) { + glog.V(2).Infof("can't find node %s: %v", dn.name, err) + return nil + } + return err + } + node, err = dn.loadNodeAnnotations(node) + if err != nil { + return err + } + dn.node = node + if err := dn.CheckStateOnBoot(); err != nil { + return err + } + // finished syncing node for the first time + dn.booting = false + return nil +} + +func (dn *Daemon) handleErr(err error, key interface{}) { + if err == nil { + dn.queue.Forget(key) + return + } + + if dn.queue.NumRequeues(key) < maxRetries { + glog.V(2).Infof("Error syncing node %v: %v", key, err) + dn.queue.AddRateLimited(key) + return + } + + dn.nodeWriter.SetDegraded(err, dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name) + + utilruntime.HandleError(err) + glog.V(2).Infof("Dropping node %q out of the queue: %v", key, err) + dn.queue.Forget(key) + dn.queue.AddAfter(key, 1*time.Minute) +} + +func (dn *Daemon) syncNode(key string) error { + startTime := time.Now() + glog.V(4).Infof("Started syncing node %q (%v)", key, startTime) + defer func() { + glog.V(4).Infof("Finished syncing node %q (%v)", key, time.Since(startTime)) + }() + + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + node, err := dn.nodeLister.Get(name) + if apierrors.IsNotFound(err) { + glog.V(2).Infof("node %v has been deleted", key) + return nil + } + if err != nil { + return err + } + + // Deep-copy otherwise we are mutating our cache. + node = node.DeepCopy() + + // Check for Deleted Node + if node.DeletionTimestamp != nil { + return nil + } + + // First check if the node that was updated is this daemon's node + if node.Name == dn.name { + // stash the current node being processed + dn.node = node + // Pass to the shared update prep method + needUpdate, err := dn.prepUpdateFromCluster() + if err != nil { + glog.Infof("Unable to prep update: %s", err) + return err + } + if needUpdate { + if err := dn.triggerUpdateWithMachineConfig(nil, nil); err != nil { + glog.Infof("Unable to apply update: %s", err) + return err + } + } + } + return nil +} + +// enqueueAfter will enqueue a node after the provided amount of time. +func (dn *Daemon) enqueueAfter(node *corev1.Node, after time.Duration) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(node) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", node, err)) + return + } + + dn.queue.AddAfter(key, after) +} + +// enqueueDefault calls a default enqueue function +func (dn *Daemon) enqueueDefault(node *corev1.Node) { + dn.enqueueAfter(node, updateDelay) +} + const ( // IDs are taken from https://cgit.freedesktop.org/systemd/systemd/plain/src/systemd/sd-messages.h sdMessageSessionStart = "8d45620c1a4348dbb17410da57c60c66" @@ -310,6 +463,9 @@ func (dn *Daemon) detectEarlySSHAccessesFromBoot() error { // responsible for triggering callbacks to handle updates. Successful // updates shouldn't return, and should just reboot the node. func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error { + defer utilruntime.HandleCrash() + defer dn.queue.ShutDown() + if dn.kubeletHealthzEnabled { glog.Info("Enabling Kubelet Healthz Monitor") go dn.runKubeletHealthzMonitor(stopCh, dn.exitCh) @@ -335,17 +491,21 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error { } } - if !cache.WaitForCacheSync(stopCh, dn.nodeListerSynced) { - return dn.nodeWriter.SetUpdateDegradedMsgIgnoreErr("failed to sync cache", dn.kubeClient.CoreV1().Nodes(), dn.name) + if !cache.WaitForCacheSync(stopCh, dn.nodeListerSynced, dn.mcListerSynced) { + return errors.New("failed to sync initial listers cache") } - // Block on exit channel. The node informer will send callbacks through - // handleNodeUpdate(). If a failure happens there, it writes to the channel. - // The HealthzMonitor goroutine also writes to this channel if the threshold - // is reached. - err := <-exitCh + go wait.Until(dn.worker, time.Second, stopCh) - return dn.nodeWriter.SetUpdateDegradedIgnoreErr(err, dn.kubeClient.CoreV1().Nodes(), dn.name) + for { + select { + case <-stopCh: + return nil + case err := <-exitCh: + // This channel gets errors from auxiliary goroutines like loginmonitor and kubehealth + glog.Warningf("Got an error from auxiliary tools: %v", err) + } + } } // BindPodMounts ensures that the daemon can still see e.g. /run/secrets/kubernetes.io @@ -381,7 +541,7 @@ func (dn *Daemon) runLoginMonitor(stopCh <-chan struct{}, exitCh chan<- error) { } func (dn *Daemon) applySSHAccessedAnnotation() error { - if err := dn.nodeWriter.SetSSHAccessed(dn.kubeClient.CoreV1().Nodes(), dn.name); err != nil { + if err := dn.nodeWriter.SetSSHAccessed(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name); err != nil { return fmt.Errorf("error: cannot apply annotation for SSH access due to: %v", err) } return nil @@ -440,15 +600,6 @@ func (dn *Daemon) getHealth() error { return nil } -// EnterDegradedState causes the MCD to update the annotations -// to note that we're degraded, and sleep forever. -func (dn *Daemon) EnterDegradedState(err error) { - glog.Errorf("fatal error checking initial state of node: %v", err) - dn.nodeWriter.SetUpdateDegradedIgnoreErr(err, dn.kubeClient.CoreV1().Nodes(), dn.name) - glog.Info("Entering degraded state; going to sleep") - select {} -} - // stateAndConfigs is the "state" node annotation plus parsed machine configs // referenced by the currentConfig and desiredConfig annotations. If we have // a "pending" config (we're coming up after a reboot attempting to apply a config), @@ -485,7 +636,7 @@ func (dn *Daemon) getStateAndConfigs(pendingConfigName string) (*stateAndConfigs if err != nil { return nil, err } - currentConfig, err := getMachineConfig(dn.client.MachineconfigurationV1().MachineConfigs(), currentConfigName) + currentConfig, err := dn.mcLister.Get(currentConfigName) if err != nil { return nil, err } @@ -504,7 +655,7 @@ func (dn *Daemon) getStateAndConfigs(pendingConfigName string) (*stateAndConfigs desiredConfig = currentConfig glog.Infof("Current+desired config: %s", currentConfigName) } else { - desiredConfig, err = getMachineConfig(dn.client.MachineconfigurationV1().MachineConfigs(), desiredConfigName) + desiredConfig, err = dn.mcLister.Get(desiredConfigName) if err != nil { return nil, err } @@ -519,7 +670,7 @@ func (dn *Daemon) getStateAndConfigs(pendingConfigName string) (*stateAndConfigs if pendingConfigName == desiredConfigName { pendingConfig = desiredConfig } else if pendingConfigName != "" { - pendingConfig, err = getMachineConfig(dn.client.MachineconfigurationV1().MachineConfigs(), pendingConfigName) + pendingConfig, err = dn.mcLister.Get(pendingConfigName) if err != nil { return nil, err } @@ -590,14 +741,6 @@ func (dn *Daemon) CheckStateOnBoot() error { return fmt.Errorf("error detecting early SSH accesses: %v", err) } } - if state.state == constants.MachineConfigDaemonStateDegraded { - // We're already degraded. Sleep so that we don't clobber - // output of previous run which probably contains the real - // reason why we marked the node as degraded in the first place - // (Though we should probably serialize the reason as an annotation) - glog.Info("Node is degraded; going to sleep") - select {} - } if state.bootstrapping { targetOSImageURL := state.currentConfig.Spec.OSImageURL @@ -611,6 +754,7 @@ func (dn *Daemon) CheckStateOnBoot() error { return dn.updateOSAndReboot(state.currentConfig) } glog.Info("No bootstrap pivot required; unlinking bootstrap node annotations") + // Delete the bootstrap node annotations; the // currentConfig's osImageURL should now be *truth*. // In other words if it drifts somehow, we go degraded. @@ -647,7 +791,7 @@ func (dn *Daemon) CheckStateOnBoot() error { // were coming up, so we next look at that before uncordoning the node (so // we don't uncordon and then immediately re-cordon) if state.pendingConfig != nil { - if err := dn.nodeWriter.SetUpdateDone(dn.kubeClient.CoreV1().Nodes(), dn.name, state.pendingConfig.GetName()); err != nil { + if err := dn.nodeWriter.SetUpdateDone(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name, state.pendingConfig.GetName()); err != nil { return err } // And remove the pending state file @@ -687,14 +831,16 @@ func (dn *Daemon) runOnceFromMachineConfig(machineConfig mcfgv1.MachineConfig, c // NOTE: This case expects a cluster to exists already. needUpdate, err := dn.prepUpdateFromCluster() if err != nil { - return dn.nodeWriter.SetUpdateDegradedIgnoreErr(err, dn.kubeClient.CoreV1().Nodes(), dn.name) + dn.nodeWriter.SetDegraded(err, dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name) + return err } if !needUpdate { return nil } // At this point we have verified we need to update - if err := dn.executeUpdateFromClusterWithMachineConfig(&machineConfig); err != nil { - return dn.nodeWriter.SetUpdateDegradedIgnoreErr(err, dn.kubeClient.CoreV1().Nodes(), dn.name) + if err := dn.triggerUpdateWithMachineConfig(nil, &machineConfig); err != nil { + dn.nodeWriter.SetDegraded(err, dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name) + return err } return nil } @@ -720,50 +866,18 @@ func (dn *Daemon) runOnceFromIgnition(ignConfig ignv2_2types.Config) error { return dn.reboot("runOnceFromIgnition complete") } -// handleNodeUpdate is the gatekeeper handler for informer callbacks detecting -// node changes. If an update is requested by the controller, we assume that -// that means something changed and pass over to execution methods no matter what. -// Also note that we only care about node updates, not creation or deletion. func (dn *Daemon) handleNodeUpdate(old, cur interface{}) { - node := cur.(*corev1.Node) + oldNode := old.(*corev1.Node) + curNode := cur.(*corev1.Node) - // First check if the node that was updated is this daemon's node - if node.Name == dn.name { - // stash the current node being processed - dn.node = node - // Pass to the shared update prep method - needUpdate, err := dn.prepUpdateFromCluster() - if err != nil { - glog.Infof("Unable to prep update: %s", err) - dn.exitCh <- err - return - } - // Only executeUpdateFromCluster when we need to update - if needUpdate { - if err = dn.executeUpdateFromCluster(); err != nil { - glog.Infof("Unable to apply update: %s", err) - dn.exitCh <- err - return - } - } - } - // The node that was changed was not ours, return out - return + glog.V(4).Infof("Updating Node %s", oldNode.Name) + dn.enqueueNode(curNode) } // prepUpdateFromCluster handles the shared update prepping functionality for // flows that expect the cluster to already be available. Returns true if an // update is required, false otherwise. func (dn *Daemon) prepUpdateFromCluster() (bool, error) { - // Then check we're not already in a degraded state. - state, err := getNodeAnnotation(dn.node, constants.MachineConfigDaemonStateAnnotationKey) - if err != nil { - return false, err - } - if state == constants.MachineConfigDaemonStateDegraded { - return false, fmt.Errorf("state is already degraded") - } - desiredConfigName, err := getNodeAnnotationExt(dn.node, constants.DesiredMachineConfigAnnotationKey, true) if err != nil { return false, err @@ -784,26 +898,6 @@ func (dn *Daemon) prepUpdateFromCluster() (bool, error) { return true, nil } -// executeUpdateFromClusterWithMachineConfig starts the actual update process. The provided config -// will be used as the desired config, while the current config will be pulled from the cluster. If -// you want both pulled from the cluster please use executeUpdateFromCluster(). -func (dn *Daemon) executeUpdateFromClusterWithMachineConfig(desiredConfig *mcfgv1.MachineConfig) error { - // The desired machine config has changed, trigger update - if err := dn.triggerUpdateWithMachineConfig(nil, desiredConfig); err != nil { - return err - } - - // we managed to update the machine without rebooting. in this case, - // continue as usual waiting for the next update - glog.V(2).Info("Successfully updated without reboot") - return nil -} - -// executeUpdateFromCluster starts the actual update process using configs from the cluster. -func (dn *Daemon) executeUpdateFromCluster() error { - return dn.executeUpdateFromClusterWithMachineConfig(nil) -} - // completeUpdate marks the node as schedulable again, then deletes the // "transient state" file, which signifies that all of those prior steps have // been completed. @@ -825,7 +919,7 @@ func (dn *Daemon) triggerUpdateWithMachineConfig(currentConfig *mcfgv1.MachineCo if err != nil { return err } - currentConfig, err = getMachineConfig(dn.client.MachineconfigurationV1().MachineConfigs(), ccAnnotation) + currentConfig, err = dn.mcLister.Get(ccAnnotation) if err != nil { return err } @@ -836,7 +930,7 @@ func (dn *Daemon) triggerUpdateWithMachineConfig(currentConfig *mcfgv1.MachineCo if err != nil { return err } - desiredConfig, err = getMachineConfig(dn.client.MachineconfigurationV1().MachineConfigs(), dcAnnotation) + desiredConfig, err = dn.mcLister.Get(dcAnnotation) if err != nil { return err } @@ -1028,21 +1122,6 @@ func (dn *Daemon) Close() { dn.loginClient.Close() } -func getMachineConfig(client mcfgclientv1.MachineConfigInterface, name string) (*mcfgv1.MachineConfig, error) { - // Retry for 5 minutes to get a MachineConfig in case of transient errors. - var mc *mcfgv1.MachineConfig - err := wait.PollImmediate(10*time.Second, 5*time.Minute, func() (bool, error) { - var err error - mc, err = client.Get(name, metav1.GetOptions{}) - if err == nil { - return true, nil - } - glog.Infof("While getting MachineConfig %s, got: %v. Retrying...", name, err) - return false, nil - }) - return mc, err -} - // ValidPath attempts to see if the path provided is indeed an acceptable // filesystem path. This function does not check if the path exists. func ValidPath(path string) bool { @@ -1070,7 +1149,7 @@ func (dn *Daemon) SenseAndLoadOnceFrom() (interface{}, string, string, error) { } defer resp.Body.Close() // Read the body content from the request - content, err = dn.fileSystemClient.ReadAll(resp.Body) + content, err = ioutil.ReadAll(resp.Body) if err != nil { return nil, "", contentFrom, err } @@ -1082,7 +1161,7 @@ func (dn *Daemon) SenseAndLoadOnceFrom() (interface{}, string, string, error) { return nil, "", contentFrom, err } - content, err = dn.fileSystemClient.ReadFile(absoluteOnceFrom) + content, err = ioutil.ReadFile(absoluteOnceFrom) if err != nil { return nil, "", contentFrom, err } diff --git a/pkg/daemon/fsclient.go b/pkg/daemon/fsclient.go deleted file mode 100644 index fae3c6160d..0000000000 --- a/pkg/daemon/fsclient.go +++ /dev/null @@ -1,86 +0,0 @@ -package daemon - -import ( - "io" - "io/ioutil" - "os" -) - -// FileSystemClient abstracts file/directory manipulation operations -type FileSystemClient interface { - Create(string) (*os.File, error) - Remove(string) error - RemoveAll(string) error - MkdirAll(string, os.FileMode) error - Stat(string) (os.FileInfo, error) - Symlink(string, string) error - Chmod(string, os.FileMode) error - Chown(string, int, int) error - WriteFile(filename string, data []byte, perm os.FileMode) error - ReadAll(reader io.Reader) ([]byte, error) - ReadFile(filename string) ([]byte, error) -} - -// FsClient is used to hang the FileSystemClient functions on. -type FsClient struct{} - -// Create implements os.Create -func (f FsClient) Create(name string) (*os.File, error) { - return os.Create(name) -} - -// Remove implements os.Remove -func (f FsClient) Remove(name string) error { - return os.Remove(name) -} - -// RemoveAll implements os.RemoveAll -func (f FsClient) RemoveAll(path string) error { - return os.RemoveAll(path) -} - -// MkdirAll implements os.MkdirAll -func (f FsClient) MkdirAll(name string, perm os.FileMode) error { - return os.MkdirAll(name, perm) -} - -// Stat implements os.Stat -func (f FsClient) Stat(name string) (os.FileInfo, error) { - return os.Stat(name) -} - -// Symlink implements os.Symlink -func (f FsClient) Symlink(oldname, newname string) error { - return os.Symlink(oldname, newname) -} - -// Chmod implements os.Chmod -func (f FsClient) Chmod(name string, mode os.FileMode) error { - return os.Chmod(name, mode) -} - -// Chown implements os.Chown -func (f FsClient) Chown(name string, uid, gid int) error { - return os.Chown(name, uid, gid) -} - -// WriteFile implements ioutil.WriteFile -func (f FsClient) WriteFile(filename string, data []byte, perm os.FileMode) error { - return ioutil.WriteFile(filename, data, perm) -} - -// ReadFile implements ioutil.WriteFile -func (f FsClient) ReadFile(filename string) ([]byte, error) { - return ioutil.ReadFile(filename) -} - -// ReadAll implements ioutil.ReadAll -func (f FsClient) ReadAll(reader io.Reader) ([]byte, error) { - return ioutil.ReadAll(reader) -} - -// NewFileSystemClient creates a new file system client using the default -// implementations provided by the os package. -func NewFileSystemClient() FileSystemClient { - return FsClient{} -} diff --git a/pkg/daemon/fsclient_test.go b/pkg/daemon/fsclient_test.go deleted file mode 100644 index c61ce80fce..0000000000 --- a/pkg/daemon/fsclient_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package daemon - -import ( - "io" - "os" -) - -// CreateReturn is a structure used for testing. It holds a single return value -// set for a mocked Create call. -type CreateReturn struct { - OsFilePointer *os.File - Error error -} - -// StatReturn is a structure used for testing. It holds a single return value -// set for a mocked Stat call. -type StatReturn struct { - OsFileInfo os.FileInfo - Error error -} - -// ReadFileReturn is a structure used for testing. It holds a single return value -// set for a mocked ReadFile call. -type ReadFileReturn struct { - Bytes []byte - Error error -} - -// FsClientMockMock is used as a mock of FsClientMock for testing. -type FsClientMock struct { - CreateReturns []CreateReturn - RemoveReturns []error - RemoveAllReturns []error - MkdirAllReturns []error - StatReturns []StatReturn - SymlinkReturns []error - ChmodReturns []error - ChownReturns []error - WriteFileReturns []error - ReadAllReturns []ReadFileReturn - ReadFileReturns []ReadFileReturn -} - -// updateErrorReturns is a shortcut to pop out the error and shift -// around the Returns array. -func updateErrorReturns(returns *[]error) error { - r := *returns - returnValues := r[0] - if len(r) > 1 { - *returns = r[1:] - } - return returnValues -} - -// Create provides a mocked implemention -func (f FsClientMock) Create(name string) (*os.File, error) { - returnValues := f.CreateReturns[0] - if len(f.RemoveReturns) > 1 { - f.CreateReturns = f.CreateReturns[1:] - } - return returnValues.OsFilePointer, returnValues.Error -} - -// Remove provides a mocked implemention -func (f FsClientMock) Remove(name string) error { - return updateErrorReturns(&f.RemoveReturns) -} - -// RemoveAll provides a mocked implemention -func (f FsClientMock) RemoveAll(path string) error { - return updateErrorReturns(&f.RemoveReturns) -} - -// MkdirAll provides a mocked implemention -func (f FsClientMock) MkdirAll(name string, perm os.FileMode) error { - return updateErrorReturns(&f.MkdirAllReturns) -} - -// Stat provides a mocked implemention -func (f FsClientMock) Stat(name string) (os.FileInfo, error) { - returnValues := f.StatReturns[0] - if len(f.RemoveReturns) > 1 { - f.StatReturns = f.StatReturns[1:] - } - return returnValues.OsFileInfo, returnValues.Error -} - -// Symlink provides a mocked implemention -func (f FsClientMock) Symlink(oldname, newname string) error { - return updateErrorReturns(&f.SymlinkReturns) -} - -// Chmod provides a mocked implemention -func (f FsClientMock) Chmod(name string, mode os.FileMode) error { - return updateErrorReturns(&f.ChmodReturns) -} - -// Chown provides a mocked implemention -func (f FsClientMock) Chown(name string, uid, gid int) error { - return updateErrorReturns(&f.ChownReturns) -} - -// WriteFile provides a mocked implemention -func (f FsClientMock) WriteFile(filename string, data []byte, perm os.FileMode) error { - return updateErrorReturns(&f.WriteFileReturns) -} - -// ReadAll provides a mocked implemention -func (f FsClientMock) ReadAll(reader io.Reader) ([]byte, error) { - returnValues := f.ReadAllReturns[0] - if len(f.ReadAllReturns) > 1 { - f.ReadAllReturns = f.ReadAllReturns[1:] - } - return returnValues.Bytes, returnValues.Error -} - -// ReadFile provides a mocked implemention -func (f FsClientMock) ReadFile(filename string) ([]byte, error) { - returnValues := f.ReadFileReturns[0] - if len(f.ReadFileReturns) > 1 { - f.ReadFileReturns = f.ReadFileReturns[1:] - } - return returnValues.Bytes, returnValues.Error -} diff --git a/pkg/daemon/node.go b/pkg/daemon/node.go index e90de72234..e429fac037 100644 --- a/pkg/daemon/node.go +++ b/pkg/daemon/node.go @@ -4,46 +4,38 @@ import ( "encoding/json" "fmt" "io/ioutil" - "time" "github.com/golang/glog" "github.com/openshift/machine-config-operator/pkg/daemon/constants" - "github.com/pkg/errors" core_v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" ) -func (dn *Daemon) loadNodeAnnotations() error { - ccAnnotation, err := getNodeAnnotation(dn.node, constants.CurrentMachineConfigAnnotationKey) +func (dn *Daemon) loadNodeAnnotations(node *core_v1.Node) (*core_v1.Node, error) { + ccAnnotation, err := getNodeAnnotation(node, constants.CurrentMachineConfigAnnotationKey) // we need to load the annotations from the file only for the // first run. // the initial annotations do no need to be set if the node // already has annotations. if err == nil && ccAnnotation != "" { - return nil + return node, nil } d, err := ioutil.ReadFile(constants.InitialNodeAnnotationsFilePath) if err != nil { - return fmt.Errorf("failed to read initial annotations from %q: %v", constants.InitialNodeAnnotationsFilePath, err) + return nil, fmt.Errorf("failed to read initial annotations from %q: %v", constants.InitialNodeAnnotationsFilePath, err) } var initial map[string]string - err = json.Unmarshal(d, &initial) - if err != nil { - return fmt.Errorf("failed to unmarshal initial annotations: %v", err) + if err := json.Unmarshal(d, &initial); err != nil { + return nil, fmt.Errorf("failed to unmarshal initial annotations: %v", err) } glog.Infof("Setting initial node config: %s", initial[constants.CurrentMachineConfigAnnotationKey]) - node, err := setNodeAnnotations(dn.kubeClient.CoreV1().Nodes(), dn.node.Name, initial) + n, err := setNodeAnnotations(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, node.Name, initial) if err != nil { - return fmt.Errorf("failed to set initial annotations: %v", err) + return nil, fmt.Errorf("failed to set initial annotations: %v", err) } - dn.node = node - - return nil + return n, nil } // getNodeAnnotation gets the node annotation, unsurprisingly @@ -51,37 +43,6 @@ func getNodeAnnotation(node *core_v1.Node, k string) (string, error) { return getNodeAnnotationExt(node, k, false) } -// getNode queries the kube apiserver to get the node named nodeName -func getNode(client corev1.NodeInterface, nodeName string) (*core_v1.Node, error) { - var lastErr error - var n *core_v1.Node - err := wait.PollImmediate(10*time.Second, 5*time.Minute, func() (bool, error) { - n, lastErr = client.Get(nodeName, metav1.GetOptions{}) - if lastErr == nil { - return true, nil - } - glog.Warningf("Failed to fetch node %s (%v); retrying...", nodeName, lastErr) - return false, nil - }) - if err != nil { - if err == wait.ErrWaitTimeout { - return nil, errors.Wrapf(lastErr, "timed out trying to fetch node %s", nodeName) - } - return nil, err - } - return n, nil -} - -// setInitialNode gets the node object by querying the api server when the daemon starts -func (dn *Daemon) setInitialNode(nodeName string) error { - node, err := getNode(dn.kubeClient.CoreV1().Nodes(), nodeName) - if err != nil { - return err - } - dn.node = node - return nil -} - // getNodeAnnotationExt is like getNodeAnnotation, but allows one to customize ENOENT handling func getNodeAnnotationExt(node *core_v1.Node, k string, allowNoent bool) (string, error) { v, ok := node.Annotations[k] diff --git a/pkg/daemon/update.go b/pkg/daemon/update.go index fec4629f29..da5442733b 100644 --- a/pkg/daemon/update.go +++ b/pkg/daemon/update.go @@ -5,12 +5,10 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "os" "os/exec" "os/signal" "os/user" - "path" "path/filepath" "reflect" "strconv" @@ -20,12 +18,13 @@ import ( ignv2_2types "github.com/coreos/ignition/config/v2_2/types" "github.com/coreos/ignition/config/validate" "github.com/golang/glog" + "github.com/google/renameio" drain "github.com/openshift/kubernetes-drain" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" errors "github.com/pkg/errors" "github.com/vincent-petithory/dataurl" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" ) @@ -40,24 +39,38 @@ const ( coreUserSSHPath = "/home/core/.ssh/" ) -// Someone please tell me this actually lives in the stdlib somewhere -func replaceFileContentsAtomically(fpath string, b []byte) error { - f, err := ioutil.TempFile(path.Dir(fpath), "") +func writeFileAtomicallyWithDefaults(fpath string, b []byte) error { + return writeFileAtomically(fpath, b, defaultDirectoryPermissions, defaultFilePermissions, -1, -1) +} + +// writeFileAtomically uses the renameio package to provide atomic file writing, we can't use renameio.WriteFile +// directly since we need to 1) Chown 2) go through a buffer since files provided can be big +func writeFileAtomically(fpath string, b []byte, dirMode, fileMode os.FileMode, uid, gid int) error { + if err := os.MkdirAll(filepath.Dir(fpath), dirMode); err != nil { + return fmt.Errorf("failed to create directory %q: %v", filepath.Dir(fpath), err) + } + t, err := renameio.TempFile("", fpath) if err != nil { return err } - defer f.Close() - n, err := f.Write(b) - if err == nil && n < len(b) { - err = io.ErrShortWrite + defer t.Cleanup() + // Set permissions before writing data, in case the data is sensitive. + if err := t.Chmod(fileMode); err != nil { + return err } - if err != nil { + w := bufio.NewWriter(t) + if _, err := w.Write(b); err != nil { return err } - if err := os.Rename(f.Name(), fpath); err != nil { + if err := w.Flush(); err != nil { return err } - return nil + if uid != -1 && gid != -1 { + if err := t.Chown(uid, gid); err != nil { + return err + } + } + return t.CloseAtomicallyReplace() } func (dn *Daemon) writePendingState(desiredConfig *mcfgv1.MachineConfig) error { @@ -69,7 +82,15 @@ func (dn *Daemon) writePendingState(desiredConfig *mcfgv1.MachineConfig) error { if err != nil { return err } - return replaceFileContentsAtomically(pathStateJSON, b) + return writeFileAtomicallyWithDefaults(pathStateJSON, b) +} + +func getNodeRef(node *corev1.Node) *corev1.ObjectReference { + return &corev1.ObjectReference{ + Kind: "Node", + Name: node.GetName(), + UID: types.UID(node.GetUID()), + } } // updateOSAndReboot is the last step in an update(), and it can also @@ -83,7 +104,7 @@ func (dn *Daemon) updateOSAndReboot(newConfig *mcfgv1.MachineConfig) error { if dn.onceFrom == "" { glog.Info("Update prepared; draining the node") - dn.recorder.Eventf(dn.node, corev1.EventTypeNormal, "Drain", "Draining node to update config.") + dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Drain", "Draining node to update config.") backoff := wait.Backoff{ Steps: 5, @@ -145,14 +166,19 @@ func (dn *Daemon) catchIgnoreSIGTERM() { } // update the node to the provided node configuration. -func (dn *Daemon) update(oldConfig, newConfig *mcfgv1.MachineConfig) error { +func (dn *Daemon) update(oldConfig, newConfig *mcfgv1.MachineConfig) (retErr error) { if dn.nodeWriter != nil { - if err := dn.nodeWriter.SetUpdateWorking(dn.kubeClient.CoreV1().Nodes(), dn.name); err != nil { + if err := dn.nodeWriter.SetUpdateWorking(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name); err != nil { return err } } dn.catchIgnoreSIGTERM() + defer func() { + if retErr != nil { + dn.cancelSIGTERM() + } + }() oldConfigName := oldConfig.GetName() newConfigName := newConfig.GetName() @@ -163,7 +189,12 @@ func (dn *Daemon) update(oldConfig, newConfig *mcfgv1.MachineConfig) error { if reconcilableError != nil { wrappedErr := fmt.Errorf("can't reconcile config %s with %s: %v", oldConfigName, newConfigName, reconcilableError) if dn.recorder != nil { - dn.recorder.Eventf(newConfig, corev1.EventTypeWarning, "FailedToReconcile", wrappedErr.Error()) + mcRef := &corev1.ObjectReference{ + Kind: "MachineConfig", + Name: newConfig.GetName(), + UID: newConfig.GetUID(), + } + dn.recorder.Eventf(mcRef, corev1.EventTypeWarning, "FailedToReconcile", wrappedErr.Error()) } dn.logSystem(wrappedErr.Error()) return wrappedErr @@ -174,10 +205,28 @@ func (dn *Daemon) update(oldConfig, newConfig *mcfgv1.MachineConfig) error { return err } + defer func() { + if retErr != nil { + if err := dn.updateFiles(newConfig, oldConfig); err != nil { + retErr = errors.Wrapf(retErr, "error rolling back files writes %v", err) + return + } + } + }() + if err := dn.updateSSHKeys(newConfig.Spec.Config.Passwd.Users); err != nil { return err } + defer func() { + if retErr != nil { + if err := dn.updateSSHKeys(oldConfig.Spec.Config.Passwd.Users); err != nil { + retErr = errors.Wrapf(retErr, "error rolling back SSH keys updates %v", err) + return + } + } + }() + return dn.updateOSAndReboot(newConfig) } @@ -359,7 +408,7 @@ func (dn *Daemon) deleteStaleData(oldConfig, newConfig *mcfgv1.MachineConfig) er for _, f := range oldConfig.Spec.Config.Storage.Files { if _, ok := newFileSet[f.Path]; !ok { glog.V(2).Infof("Deleting stale config file: %s", f.Path) - if err := dn.fileSystemClient.Remove(f.Path); err != nil { + if err := os.Remove(f.Path); err != nil { newErr := fmt.Errorf("unable to delete %s: %s", f.Path, err) if !os.IsNotExist(err) { return newErr @@ -367,6 +416,7 @@ func (dn *Daemon) deleteStaleData(oldConfig, newConfig *mcfgv1.MachineConfig) er // otherwise, just warn glog.Warningf("%v", newErr) } + glog.Infof("Removed stale file %q", f.Path) } } @@ -386,7 +436,7 @@ func (dn *Daemon) deleteStaleData(oldConfig, newConfig *mcfgv1.MachineConfig) er path := filepath.Join(pathSystemd, u.Name+".d", u.Dropins[j].Name) if _, ok := newDropinSet[path]; !ok { glog.V(2).Infof("Deleting stale systemd dropin file: %s", path) - if err := dn.fileSystemClient.Remove(path); err != nil { + if err := os.Remove(path); err != nil { newErr := fmt.Errorf("unable to delete %s: %s", path, err) if !os.IsNotExist(err) { return newErr @@ -394,6 +444,7 @@ func (dn *Daemon) deleteStaleData(oldConfig, newConfig *mcfgv1.MachineConfig) er // otherwise, just warn glog.Warningf("%v", newErr) } + glog.Infof("Removed stale systemd dropin %q", path) } } path := filepath.Join(pathSystemd, u.Name) @@ -402,7 +453,7 @@ func (dn *Daemon) deleteStaleData(oldConfig, newConfig *mcfgv1.MachineConfig) er glog.Warningf("Unable to disable %s: %s", u.Name, err) } glog.V(2).Infof("Deleting stale systemd unit file: %s", path) - if err := dn.fileSystemClient.Remove(path); err != nil { + if err := os.Remove(path); err != nil { newErr := fmt.Errorf("unable to delete %s: %s", path, err) if !os.IsNotExist(err) { return newErr @@ -410,6 +461,7 @@ func (dn *Daemon) deleteStaleData(oldConfig, newConfig *mcfgv1.MachineConfig) er // otherwise, just warn glog.Warningf("%v", newErr) } + glog.Infof("Removed stale systemd unit %q", path) } } @@ -421,13 +473,13 @@ func (dn *Daemon) enableUnit(unit ignv2_2types.Unit) error { // The link location wantsPath := filepath.Join(wantsPathSystemd, unit.Name) // sanity check that we don't return an error when the link already exists - if _, err := dn.fileSystemClient.Stat(wantsPath); err == nil { + if _, err := os.Stat(wantsPath); err == nil { glog.Infof("%s already exists. Not making a new symlink", wantsPath) return nil } // The originating file to link servicePath := filepath.Join(pathSystemd, unit.Name) - err := dn.fileSystemClient.Symlink(servicePath, wantsPath) + err := renameio.Symlink(servicePath, wantsPath) if err != nil { glog.Warningf("Cannot enable unit %s: %s", unit.Name, err) } else { @@ -442,33 +494,27 @@ func (dn *Daemon) disableUnit(unit ignv2_2types.Unit) error { // The link location wantsPath := filepath.Join(wantsPathSystemd, unit.Name) // sanity check so we don't return an error when the unit was already disabled - if _, err := dn.fileSystemClient.Stat(wantsPath); err != nil { + if _, err := os.Stat(wantsPath); err != nil { glog.Infof("%s was not present. No need to remove", wantsPath) return nil } glog.V(2).Infof("Disabling unit at %s", wantsPath) - return dn.fileSystemClient.Remove(wantsPath) + return os.Remove(wantsPath) } // writeUnits writes the systemd units to disk func (dn *Daemon) writeUnits(units []ignv2_2types.Unit) error { - var path string for _, u := range units { // write the dropin to disk for i := range u.Dropins { glog.Infof("Writing systemd unit dropin %q", u.Dropins[i].Name) - path = filepath.Join(pathSystemd, u.Name+".d", u.Dropins[i].Name) - if err := dn.fileSystemClient.MkdirAll(filepath.Dir(path), defaultDirectoryPermissions); err != nil { - return fmt.Errorf("failed to create directory %q: %v", filepath.Dir(path), err) - } - glog.V(2).Infof("Created directory: %s", path) - - err := ioutil.WriteFile(path, []byte(u.Dropins[i].Contents), os.FileMode(0644)) - if err != nil { + dpath := filepath.Join(pathSystemd, u.Name+".d", u.Dropins[i].Name) + if err := writeFileAtomicallyWithDefaults(dpath, []byte(u.Dropins[i].Contents)); err != nil { return fmt.Errorf("failed to write systemd unit dropin %q: %v", u.Dropins[i].Name, err) } - glog.V(2).Infof("Wrote systemd unit dropin at %s", path) + + glog.V(2).Infof("Wrote systemd unit dropin at %s", dpath) } if u.Contents == "" { @@ -476,22 +522,19 @@ func (dn *Daemon) writeUnits(units []ignv2_2types.Unit) error { } glog.Infof("Writing systemd unit %q", u.Name) - path = filepath.Join(pathSystemd, u.Name) - if err := dn.fileSystemClient.MkdirAll(filepath.Dir(path), defaultDirectoryPermissions); err != nil { - return fmt.Errorf("failed to create directory %q: %v", filepath.Dir(path), err) - } - glog.V(2).Infof("Created directory: %s", path) + + fpath := filepath.Join(pathSystemd, u.Name) // check if the unit is masked. if it is, we write a symlink to // /dev/null and continue if u.Mask { glog.V(2).Info("Systemd unit masked") - if err := dn.fileSystemClient.RemoveAll(path); err != nil { + if err := os.RemoveAll(fpath); err != nil { return fmt.Errorf("failed to remove unit %q: %v", u.Name, err) } glog.V(2).Infof("Removed unit %q", u.Name) - if err := dn.fileSystemClient.Symlink(pathDevNull, path); err != nil { + if err := renameio.Symlink(pathDevNull, fpath); err != nil { return fmt.Errorf("failed to symlink unit %q to %s: %v", u.Name, pathDevNull, err) } glog.V(2).Infof("Created symlink unit %q to %s", u.Name, pathDevNull) @@ -500,10 +543,10 @@ func (dn *Daemon) writeUnits(units []ignv2_2types.Unit) error { } // write the unit to disk - err := ioutil.WriteFile(path, []byte(u.Contents), defaultFilePermissions) - if err != nil { + if err := writeFileAtomicallyWithDefaults(fpath, []byte(u.Contents)); err != nil { return fmt.Errorf("failed to write systemd unit %q: %v", u.Name, err) } + glog.V(2).Infof("Successfully wrote systemd unit %q: ", u.Name) // if the unit doesn't note if it should be enabled or disabled then @@ -540,57 +583,29 @@ func (dn *Daemon) writeUnits(units []ignv2_2types.Unit) error { // writeFiles writes the given files to disk. // it doesn't fetch remote files and expects a flattened config file. func (dn *Daemon) writeFiles(files []ignv2_2types.File) error { - for _, f := range files { - glog.Infof("Writing file %q", f.Path) - // create any required directories for the file - if err := dn.fileSystemClient.MkdirAll(filepath.Dir(f.Path), defaultDirectoryPermissions); err != nil { - return fmt.Errorf("failed to create directory %q: %v", filepath.Dir(f.Path), err) - } + for _, file := range files { + glog.Infof("Writing file %q", file.Path) - // create the file - file, err := dn.fileSystemClient.Create(f.Path) - if err != nil { - return fmt.Errorf("failed to create file %q: %v", f.Path, err) - } - defer file.Close() - - // write the file to disk, using the inlined file contents - contents, err := dataurl.DecodeString(f.Contents.Source) + contents, err := dataurl.DecodeString(file.Contents.Source) if err != nil { return err } - w := bufio.NewWriter(file) - if _, err := w.Write(contents.Data); err != nil { - return fmt.Errorf("failed to write inline contents to file %q: %v", f.Path, err) - } - w.Flush() - - // chmod and chown mode := defaultFilePermissions - if f.Mode != nil { - mode = os.FileMode(*f.Mode) + if file.Mode != nil { + mode = os.FileMode(*file.Mode) } - if err := file.Chmod(mode); err != nil { - return fmt.Errorf("failed to set file mode for file %q: %v", f.Path, err) - } - + var ( + uid, gid = -1, -1 + ) // set chown if file information is provided - if f.User != nil || f.Group != nil { - uid, gid, err := getFileOwnership(f) + if file.User != nil || file.Group != nil { + uid, gid, err = getFileOwnership(file) if err != nil { - return fmt.Errorf("failed to retrieve file ownership for file %q: %v", f.Path, err) - } - if err := file.Chown(uid, gid); err != nil { - return fmt.Errorf("failed to set file ownership for file %q: %v", f.Path, err) + return fmt.Errorf("failed to retrieve file ownership for file %q: %v", file.Path, err) } } - - if err := file.Sync(); err != nil { - return fmt.Errorf("failed to sync file %q: %v", f.Path, err) - } - - if err := file.Close(); err != nil { - return fmt.Errorf("failed to close file %q: %v", f.Path, err) + if err := writeFileAtomically(file.Path, contents.Data, defaultDirectoryPermissions, mode, uid, gid); err != nil { + return err } } return nil @@ -626,32 +641,38 @@ func getFileOwnership(file ignv2_2types.File) (int, int, error) { return uid, gid, nil } +func (dn *Daemon) atomicallyWriteSSHKey(newUser ignv2_2types.PasswdUser, keys string) error { + authKeyPath := filepath.Join(coreUserSSHPath, "authorized_keys") + + // Keys should only be written to "/home/core/.ssh" + // Once Users are supported fully this should be writing to PasswdUser.HomeDir + glog.Infof("Writing SSHKeys at %q", authKeyPath) + + if err := writeFileAtomicallyWithDefaults(authKeyPath, []byte(keys)); err != nil { + return err + } + + glog.V(2).Infof("Wrote SSHKeys at %s", authKeyPath) + + return nil +} + // Update a given PasswdUser's SSHKey func (dn *Daemon) updateSSHKeys(newUsers []ignv2_2types.PasswdUser) error { if len(newUsers) == 0 { return nil } - // Keys should only be written to "/home/core/.ssh" - // Once Users are supported fully this should be writing to PasswdUser.HomeDir - glog.Infof("Writing SSHKeys at %q", coreUserSSHPath) - if err := dn.fileSystemClient.MkdirAll(coreUserSSHPath, os.FileMode(0600)); err != nil { - return fmt.Errorf("failed to create directory %q: %v", coreUserSSHPath, err) - } - glog.V(2).Infof("Created directory: %s", coreUserSSHPath) - - authkeypath := filepath.Join(coreUserSSHPath, "authorized_keys") + // we're also appending all keys for any user to core, so for now + // we pass this to atomicallyWriteSSHKeys to write. var concatSSHKeys string for _, k := range newUsers[len(newUsers)-1].SSHAuthorizedKeys { concatSSHKeys = concatSSHKeys + string(k) + "\n" } - - if err := dn.fileSystemClient.WriteFile(authkeypath, []byte(concatSSHKeys), os.FileMode(0600)); err != nil { - return fmt.Errorf("failed to write ssh key: %v", err) + // newUsers[0] is currently unused since we write keys only for the core user + if err := dn.atomicSSHKeysWriter(newUsers[0], concatSSHKeys); err != nil { + return err } - - glog.V(2).Infof("Wrote SSHKeys at %s", coreUserSSHPath) - return nil } @@ -705,21 +726,25 @@ func (dn *Daemon) logSystem(format string, a ...interface{}) { } } +func (dn *Daemon) cancelSIGTERM() { + if dn.installedSigterm { + signal.Reset(syscall.SIGTERM) + dn.installedSigterm = false + } +} + // reboot is the final step. it tells systemd-logind to reboot the machine, // cleans up the agent's connections, and then sleeps for 7 days. if it wakes up // and manages to return, it returns a scary error message. func (dn *Daemon) reboot(rationale string) error { // We'll only have a recorder if we're cluster driven if dn.recorder != nil { - dn.recorder.Eventf(&corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: dn.name}}, corev1.EventTypeNormal, "Reboot", rationale) + dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Reboot", rationale) } dn.logSystem("machine-config-daemon initiating reboot: %s", rationale) // Now that everything is done, avoid delaying shutdown. - if dn.installedSigterm { - signal.Reset(syscall.SIGTERM) - dn.installedSigterm = false - } + dn.cancelSIGTERM() // reboot dn.loginClient.Reboot(false) diff --git a/pkg/daemon/update_test.go b/pkg/daemon/update_test.go index 4f9808fc87..2adb6e5ff4 100644 --- a/pkg/daemon/update_test.go +++ b/pkg/daemon/update_test.go @@ -6,7 +6,6 @@ import ( ignv2_2types "github.com/coreos/ignition/config/v2_2/types" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" - "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned/fake" "github.com/stretchr/testify/assert" k8sfake "k8s.io/client-go/kubernetes/fake" ) @@ -33,7 +32,6 @@ func TestUpdateOS(t *testing.T) { OperatingSystem: machineConfigDaemonOSRHCOS, NodeUpdaterClient: testClient, loginClient: nil, // set to nil as it will not be used within tests - client: fake.NewSimpleClientset(), kubeClient: k8sfake.NewSimpleClientset(), rootMount: "/", bootedOSImageURL: "test", @@ -67,7 +65,6 @@ func TestReconcilable(t *testing.T) { OperatingSystem: machineConfigDaemonOSRHCOS, NodeUpdaterClient: nil, loginClient: nil, - client: nil, kubeClient: nil, rootMount: "/", bootedOSImageURL: "test", @@ -208,7 +205,6 @@ func TestReconcilableSSH(t *testing.T) { OperatingSystem: machineConfigDaemonOSRHCOS, NodeUpdaterClient: testClient, loginClient: nil, // set to nil as it will not be used within tests - client: fake.NewSimpleClientset(), kubeClient: k8sfake.NewSimpleClientset(), rootMount: "/", bootedOSImageURL: "test", @@ -294,18 +290,15 @@ func TestUpdateSSHKeys(t *testing.T) { // Second rrun will return our expected error expectedError}, } - mockFS := &FsClientMock{MkdirAllReturns: []error{nil}, WriteFileReturns: []error{nil}} // Create a Daemon instance with mocked clients d := Daemon{ name: "nodeName", OperatingSystem: machineConfigDaemonOSRHCOS, NodeUpdaterClient: testClient, loginClient: nil, // set to nil as it will not be used within tests - client: fake.NewSimpleClientset(), kubeClient: k8sfake.NewSimpleClientset(), rootMount: "/", bootedOSImageURL: "test", - fileSystemClient: mockFS, } // Set up machineconfigs that are identical except for SSH keys tempUser := ignv2_2types.PasswdUser{Name: "core", SSHAuthorizedKeys: []ignv2_2types.SSHAuthorizedKey{"1234", "4567"}} @@ -319,6 +312,9 @@ func TestUpdateSSHKeys(t *testing.T) { }, }, } + + d.atomicSSHKeysWriter = func(user ignv2_2types.PasswdUser, keys string) error { return nil } + err := d.updateSSHKeys(newMcfg.Spec.Config.Passwd.Users) if err != nil { t.Errorf("Expected no error. Got %s.", err) @@ -348,18 +344,15 @@ func TestInvalidIgnConfig(t *testing.T) { // Second rrun will return our expected error expectedError}, } - mockFS := &FsClientMock{MkdirAllReturns: []error{nil}, WriteFileReturns: []error{nil}} // Create a Daemon instance with mocked clients d := Daemon{ name: "nodeName", OperatingSystem: machineConfigDaemonOSRHCOS, NodeUpdaterClient: testClient, loginClient: nil, // set to nil as it will not be used within tests - client: fake.NewSimpleClientset(), kubeClient: k8sfake.NewSimpleClientset(), rootMount: "/", bootedOSImageURL: "test", - fileSystemClient: mockFS, } oldMcfg := &mcfgv1.MachineConfig{ diff --git a/pkg/daemon/writer.go b/pkg/daemon/writer.go index 16bcb65e27..b9bad60384 100644 --- a/pkg/daemon/writer.go +++ b/pkg/daemon/writer.go @@ -1,12 +1,16 @@ package daemon import ( + "encoding/json" "fmt" "github.com/golang/glog" "github.com/openshift/machine-config-operator/pkg/daemon/constants" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + corelisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/util/retry" ) @@ -23,6 +27,7 @@ const ( // message wraps a client and responseChannel type message struct { client corev1.NodeInterface + lister corelisterv1.NodeLister node string annos map[string]string responseChannel chan error @@ -48,14 +53,14 @@ func (nw *NodeWriter) Run(stop <-chan struct{}) { case <-stop: return case msg := <-nw.writer: - _, err := setNodeAnnotations(msg.client, msg.node, msg.annos) + _, err := setNodeAnnotations(msg.client, msg.lister, msg.node, msg.annos) msg.responseChannel <- err } } } // SetUpdateDone Sets the state to UpdateDone. -func (nw *NodeWriter) SetUpdateDone(client corev1.NodeInterface, node string, dcAnnotation string) error { +func (nw *NodeWriter) SetUpdateDone(client corev1.NodeInterface, lister corelisterv1.NodeLister, node string, dcAnnotation string) error { annos := map[string]string{ constants.MachineConfigDaemonStateAnnotationKey: constants.MachineConfigDaemonStateDone, constants.CurrentMachineConfigAnnotationKey: dcAnnotation, @@ -63,6 +68,7 @@ func (nw *NodeWriter) SetUpdateDone(client corev1.NodeInterface, node string, dc respChan := make(chan error, 1) nw.writer <- message{ client: client, + lister: lister, node: node, annos: annos, responseChannel: respChan, @@ -71,13 +77,14 @@ func (nw *NodeWriter) SetUpdateDone(client corev1.NodeInterface, node string, dc } // SetUpdateWorking Sets the state to UpdateWorking. -func (nw *NodeWriter) SetUpdateWorking(client corev1.NodeInterface, node string) error { +func (nw *NodeWriter) SetUpdateWorking(client corev1.NodeInterface, lister corelisterv1.NodeLister, node string) error { annos := map[string]string{ constants.MachineConfigDaemonStateAnnotationKey: constants.MachineConfigDaemonStateWorking, } respChan := make(chan error, 1) nw.writer <- message{ client: client, + lister: lister, node: node, annos: annos, responseChannel: respChan, @@ -85,9 +92,9 @@ func (nw *NodeWriter) SetUpdateWorking(client corev1.NodeInterface, node string) return <-respChan } -// SetUpdateDegraded logs the error and sets the state to UpdateDegraded. +// SetDegraded logs the error and sets the state to UpdateDegraded. // Returns an error if it couldn't set the annotation. -func (nw *NodeWriter) SetUpdateDegraded(err error, client corev1.NodeInterface, node string) error { +func (nw *NodeWriter) SetDegraded(err error, client corev1.NodeInterface, lister corelisterv1.NodeLister, node string) error { glog.Errorf("marking degraded due to: %v", err) annos := map[string]string{ constants.MachineConfigDaemonStateAnnotationKey: constants.MachineConfigDaemonStateDegraded, @@ -95,42 +102,27 @@ func (nw *NodeWriter) SetUpdateDegraded(err error, client corev1.NodeInterface, respChan := make(chan error, 1) nw.writer <- message{ client: client, + lister: lister, node: node, annos: annos, responseChannel: respChan, } - return <-respChan -} - -// SetUpdateDegradedIgnoreErr logs the error and sets the state to -// UpdateDegraded. Logs an error if if couldn't set the annotation. Always -// returns the same error that it was passed. This is useful in situations -// where one just wants to return an error to its caller after having set the -// node to degraded due to that error. -func (nw *NodeWriter) SetUpdateDegradedIgnoreErr(err error, client corev1.NodeInterface, node string) error { - // log error here since the caller won't look at it - degradedErr := nw.SetUpdateDegraded(err, client, node) - if degradedErr != nil { - glog.Errorf("error while setting degraded: %v", degradedErr) + clientErr := <-respChan + if clientErr != nil { + glog.Errorf("Error setting degraded annotation for node %s: %v", node, clientErr) } - return err -} - -// SetUpdateDegradedMsgIgnoreErr is like SetUpdateDegradedMsgIgnoreErr but -// takes a string and constructs the error object itself. -func (nw *NodeWriter) SetUpdateDegradedMsgIgnoreErr(msg string, client corev1.NodeInterface, node string) error { - err := fmt.Errorf(msg) - return nw.SetUpdateDegradedIgnoreErr(err, client, node) + return clientErr } // SetSSHAccessed sets the ssh annotation to accessed -func (nw *NodeWriter) SetSSHAccessed(client corev1.NodeInterface, node string) error { +func (nw *NodeWriter) SetSSHAccessed(client corev1.NodeInterface, lister corelisterv1.NodeLister, node string) error { annos := map[string]string{ machineConfigDaemonSSHAccessAnnotationKey: machineConfigDaemonSSHAccessValue, } respChan := make(chan error, 1) nw.writer <- message{ client: client, + lister: lister, node: node, annos: annos, responseChannel: respChan, @@ -143,31 +135,42 @@ func (nw *NodeWriter) SetSSHAccessed(client corev1.NodeInterface, node string) e // number of times. // f will be called each time since the node object will likely have changed if // a retry is necessary. -func updateNodeRetry(client corev1.NodeInterface, nodeName string, f func(*v1.Node)) (*v1.Node, error) { +func updateNodeRetry(client corev1.NodeInterface, lister corelisterv1.NodeLister, nodeName string, f func(*v1.Node)) (*v1.Node, error) { var node *v1.Node - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - n, getErr := getNode(client, nodeName) - if getErr != nil { - return getErr + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + n, err := lister.Get(nodeName) + if err != nil { + return err + } + oldNode, err := json.Marshal(n) + if err != nil { + return err } - // Call the node modifier. - f(n) + nodeClone := n.DeepCopy() + f(nodeClone) - var err error - node, err = client.Update(n) + newNode, err := json.Marshal(nodeClone) + if err != nil { + return err + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldNode, newNode, v1.Node{}) + if err != nil { + return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) + } + + node, err = client.Patch(nodeName, types.StrategicMergePatchType, patchBytes) return err - }) - if err != nil { + }); err != nil { // may be conflict if max retries were hit return nil, fmt.Errorf("unable to update node %q: %v", node, err) } - return node, nil } -func setNodeAnnotations(client corev1.NodeInterface, nodeName string, m map[string]string) (*v1.Node, error) { - node, err := updateNodeRetry(client, nodeName, func(node *v1.Node) { +func setNodeAnnotations(client corev1.NodeInterface, lister corelisterv1.NodeLister, nodeName string, m map[string]string) (*v1.Node, error) { + node, err := updateNodeRetry(client, lister, nodeName, func(node *v1.Node) { for k, v := range m { node.Annotations[k] = v } diff --git a/vendor/github.com/google/renameio/LICENSE b/vendor/github.com/google/renameio/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/vendor/github.com/google/renameio/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/google/renameio/doc.go b/vendor/github.com/google/renameio/doc.go new file mode 100644 index 0000000000..67416df481 --- /dev/null +++ b/vendor/github.com/google/renameio/doc.go @@ -0,0 +1,21 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package renameio provides a way to atomically create or replace a file or +// symbolic link. +// +// Caveat: this package requires the file system rename(2) implementation to be +// atomic. Notably, this is not the case when using NFS with multiple clients: +// https://stackoverflow.com/a/41396801 +package renameio diff --git a/vendor/github.com/google/renameio/tempfile.go b/vendor/github.com/google/renameio/tempfile.go new file mode 100644 index 0000000000..1860b54a99 --- /dev/null +++ b/vendor/github.com/google/renameio/tempfile.go @@ -0,0 +1,175 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package renameio + +import ( + "io/ioutil" + "os" + "path/filepath" +) + +// TempDir checks whether os.TempDir() can be used as a temporary directory for +// later atomically replacing files within dest. If no (os.TempDir() resides on +// a different mount point), dest is returned. +// +// Note that the returned value ceases to be valid once either os.TempDir() +// changes (e.g. on Linux, once the TMPDIR environment variable changes) or the +// file system is unmounted. +func TempDir(dest string) string { + return tempDir("", filepath.Join(dest, "renameio-TempDir")) +} + +func tempDir(dir, dest string) string { + if dir != "" { + return dir // caller-specified directory always wins + } + + // Chose the destination directory as temporary directory so that we + // definitely can rename the file, for which both temporary and destination + // file need to point to the same mount point. + fallback := filepath.Dir(dest) + + // The user might have overridden the os.TempDir() return value by setting + // the TMPDIR environment variable. + tmpdir := os.TempDir() + + testsrc, err := ioutil.TempFile(tmpdir, "."+filepath.Base(dest)) + if err != nil { + return fallback + } + cleanup := true + defer func() { + if cleanup { + os.Remove(testsrc.Name()) + } + }() + testsrc.Close() + + testdest, err := ioutil.TempFile(filepath.Dir(dest), "."+filepath.Base(dest)) + if err != nil { + return fallback + } + defer os.Remove(testdest.Name()) + testdest.Close() + + if err := os.Rename(testsrc.Name(), testdest.Name()); err != nil { + return fallback + } + cleanup = false // testsrc no longer exists + return tmpdir +} + +// PendingFile is a pending temporary file, waiting to replace the destination +// path in a call to CloseAtomicallyReplace. +type PendingFile struct { + *os.File + + path string + done bool + closed bool +} + +// Cleanup is a no-op if CloseAtomicallyReplace succeeded, and otherwise closes +// and removes the temporary file. +func (t *PendingFile) Cleanup() error { + if t.done { + return nil + } + // An error occurred. Close and remove the tempfile. Errors are returned for + // reporting, there is nothing the caller can recover here. + var closeErr error + if !t.closed { + closeErr = t.Close() + } + if err := os.Remove(t.Name()); err != nil { + return err + } + return closeErr +} + +// CloseAtomicallyReplace closes the temporary file and atomically replaces +// the destination file with it, i.e., a concurrent open(2) call will either +// open the file previously located at the destination path (if any), or the +// just written file, but the file will always be present. +func (t *PendingFile) CloseAtomicallyReplace() error { + // Even on an ordered file system (e.g. ext4 with data=ordered) or file + // systems with write barriers, we cannot skip the fsync(2) call as per + // Theodore Ts'o (ext2/3/4 lead developer): + // + // > data=ordered only guarantees the avoidance of stale data (e.g., the previous + // > contents of a data block showing up after a crash, where the previous data + // > could be someone's love letters, medical records, etc.). Without the fsync(2) + // > a zero-length file is a valid and possible outcome after the rename. + if err := t.Sync(); err != nil { + return err + } + t.closed = true + if err := t.Close(); err != nil { + return err + } + if err := os.Rename(t.Name(), t.path); err != nil { + return err + } + t.done = true + return nil +} + +// TempFile wraps ioutil.TempFile for the use case of atomically creating or +// replacing the destination file at path. +// +// If dir is the empty string, TempDir(filepath.Base(path)) is used. If you are +// going to write a large number of files to the same file system, store the +// result of TempDir(filepath.Base(path)) and pass it instead of the empty +// string. +// +// The file's permissions will be 0600 by default. You can change these by +// explicitly calling Chmod on the returned PendingFile. +func TempFile(dir, path string) (*PendingFile, error) { + f, err := ioutil.TempFile(tempDir(dir, path), "."+filepath.Base(path)) + if err != nil { + return nil, err + } + + return &PendingFile{File: f, path: path}, nil +} + +// Symlink wraps os.Symlink, replacing an existing symlink with the same name +// atomically (os.Symlink fails when newname already exists, at least on Linux). +func Symlink(oldname, newname string) error { + // Fast path: if newname does not exist yet, we can skip the whole dance + // below. + if err := os.Symlink(oldname, newname); err == nil || !os.IsExist(err) { + return err + } + + // We need to use ioutil.TempDir, as we cannot overwrite a ioutil.TempFile, + // and removing+symlinking creates a TOCTOU race. + d, err := ioutil.TempDir(filepath.Dir(newname), "."+filepath.Base(newname)) + if err != nil { + return err + } + defer os.RemoveAll(d) + + symlink := filepath.Join(d, "tmp.symlink") + if err := os.Symlink(oldname, symlink); err != nil { + return err + } + + if err := os.Rename(symlink, newname); err != nil { + return err + } + + return os.RemoveAll(d) +} diff --git a/vendor/github.com/google/renameio/writefile.go b/vendor/github.com/google/renameio/writefile.go new file mode 100644 index 0000000000..187d2d4fa9 --- /dev/null +++ b/vendor/github.com/google/renameio/writefile.go @@ -0,0 +1,38 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package renameio + +import "os" + +// WriteFile mirrors ioutil.WriteFile, replacing an existing file with the same +// name atomically. +func WriteFile(filename string, data []byte, perm os.FileMode) error { + t, err := TempFile("", filename) + if err != nil { + return err + } + defer t.Cleanup() + + // Set permissions before writing data, in case the data is sensitive. + if err := t.Chmod(perm); err != nil { + return err + } + + if _, err := t.Write(data); err != nil { + return err + } + + return t.CloseAtomicallyReplace() +}