diff --git a/cmd/node-joiner/main.go b/cmd/node-joiner/main.go index 84116634b2e..7f6b014c67b 100644 --- a/cmd/node-joiner/main.go +++ b/cmd/node-joiner/main.go @@ -34,7 +34,22 @@ func main() { Use: "monitor-add-nodes", Short: "Monitors the configured nodes while they are joining an existing cluster", RunE: func(cmd *cobra.Command, args []string) error { - return nodejoiner.NewMonitorAddNodesCommand("") + dir, err := cmd.Flags().GetString("dir") + if err != nil { + return err + } + + kubeConfig, err := cmd.Flags().GetString("kubeconfig") + if err != nil { + return err + } + + ips := args + logrus.Infof("Monitoring IPs: %v", ips) + if len(ips) == 0 { + logrus.Fatal("At least one IP address must be specified") + } + return nodejoiner.NewMonitorAddNodesCommand(dir, kubeConfig, ips) }, } @@ -74,8 +89,9 @@ func runRootCmd(cmd *cobra.Command, args []string) { // Overriding it here allows the same check to be done, but against the // hook's output instead of the logger's output. ForceColors: terminal.IsTerminal(int(os.Stderr.Fd())), - DisableTimestamp: true, DisableLevelTruncation: true, + DisableTimestamp: false, + FullTimestamp: true, DisableQuote: true, })) diff --git a/cmd/openshift-install/agent/waitfor.go b/cmd/openshift-install/agent/waitfor.go index cbb4f3b1327..f3992e78b7c 100644 --- a/cmd/openshift-install/agent/waitfor.go +++ b/cmd/openshift-install/agent/waitfor.go @@ -2,6 +2,7 @@ package agent import ( "context" + "path/filepath" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -9,6 +10,7 @@ import ( "github.com/openshift/installer/cmd/openshift-install/command" agentpkg "github.com/openshift/installer/pkg/agent" + "github.com/openshift/installer/pkg/asset/agent/workflow" ) const ( @@ -62,8 +64,15 @@ func newWaitForBootstrapCompleteCmd() *cobra.Command { logrus.Fatal("No cluster installation directory found") } + kubeconfigPath := filepath.Join(assetDir, "auth", "kubeconfig") + + rendezvousIP, sshKey, err := agentpkg.FindRendezvouIPAndSSHKeyFromAssetStore(assetDir) + if err != nil { + logrus.Fatal(err) + } + ctx := context.Background() - cluster, err := agentpkg.NewCluster(ctx, assetDir) + cluster, err := agentpkg.NewCluster(ctx, assetDir, rendezvousIP, kubeconfigPath, sshKey, workflow.AgentWorkflowTypeInstall) if err != nil { logrus.Exit(exitCodeBootstrapFailed) } @@ -90,8 +99,15 @@ func newWaitForInstallCompleteCmd() *cobra.Command { logrus.Fatal("No cluster installation directory found") } + kubeconfigPath := filepath.Join(assetDir, "auth", "kubeconfig") + + rendezvousIP, sshKey, err := agentpkg.FindRendezvouIPAndSSHKeyFromAssetStore(assetDir) + if err != nil { + logrus.Fatal(err) + } + ctx := context.Background() - cluster, err := agentpkg.NewCluster(ctx, assetDir) + cluster, err := agentpkg.NewCluster(ctx, assetDir, rendezvousIP, kubeconfigPath, sshKey, workflow.AgentWorkflowTypeInstall) if err != nil { logrus.Exit(exitCodeBootstrapFailed) } diff --git a/pkg/agent/cluster.go b/pkg/agent/cluster.go index 150827e1460..1ea29b8c688 100644 --- a/pkg/agent/cluster.go +++ b/pkg/agent/cluster.go @@ -2,6 +2,7 @@ package agent import ( "context" + "fmt" "net" "os" "path/filepath" @@ -14,6 +15,7 @@ import ( "github.com/openshift/assisted-service/client/installer" "github.com/openshift/assisted-service/models" + "github.com/openshift/installer/pkg/asset/agent/workflow" "github.com/openshift/installer/pkg/gather/ssh" ) @@ -27,6 +29,7 @@ type Cluster struct { clusterID *strfmt.UUID clusterInfraEnvID *strfmt.UUID installHistory *clusterInstallStatusHistory + workflow workflow.AgentWorkflowType } type clientSet struct { @@ -63,21 +66,20 @@ type clusterInstallStatusHistory struct { } // NewCluster initializes a Cluster object -func NewCluster(ctx context.Context, assetDir string) (*Cluster, error) { - +func NewCluster(ctx context.Context, assetDir, rendezvousIP, kubeconfigPath, sshKey string, workflowType workflow.AgentWorkflowType) (*Cluster, error) { czero := &Cluster{} capi := &clientSet{} - restclient, err := NewNodeZeroRestClient(ctx, assetDir) + restclient, err := NewNodeZeroRestClient(ctx, rendezvousIP, sshKey) if err != nil { logrus.Fatal(err) } - kubeclient, err := NewClusterKubeAPIClient(ctx, assetDir) + kubeclient, err := NewClusterKubeAPIClient(ctx, kubeconfigPath) if err != nil { logrus.Fatal(err) } - ocpclient, err := NewClusterOpenShiftAPIClient(ctx, assetDir) + ocpclient, err := NewClusterOpenShiftAPIClient(ctx, kubeconfigPath) if err != nil { logrus.Fatal(err) } @@ -108,6 +110,7 @@ func NewCluster(ctx context.Context, assetDir string) (*Cluster, error) { czero.Ctx = ctx czero.API = capi + czero.workflow = workflowType czero.clusterID = nil czero.clusterInfraEnvID = nil czero.assetDir = assetDir @@ -167,7 +170,6 @@ func (czero *Cluster) IsBootstrapComplete() (bool, bool, error) { if configmap { logrus.Info("Bootstrap configMap status is complete") czero.installHistory.ClusterBootstrapComplete = true - return true, false, nil } if err != nil { logrus.Debug(err) @@ -176,105 +178,133 @@ func (czero *Cluster) IsBootstrapComplete() (bool, bool, error) { // Agent Rest API is available if agentRestAPILive { - - // First time we see the agent Rest API - if !czero.installHistory.RestAPISeen { - logrus.Debug("Agent Rest API Initialized") - czero.installHistory.RestAPISeen = true - czero.installHistory.NotReadyTime = time.Now() + exitOnErr, err := czero.MonitorStatusFromAssistedService() + if err != nil { + return false, exitOnErr, err } + } - // Lazy loading of the clusterID and clusterInfraEnvID - if czero.clusterID == nil { - clusterID, err := czero.API.Rest.getClusterID() - if err != nil { - return false, false, errors.Wrap(err, "Unable to retrieve clusterID from Agent Rest API") - } - czero.clusterID = clusterID - } + // cluster bootstrap is not complete + return false, false, nil +} - if czero.clusterInfraEnvID == nil { - clusterInfraEnvID, err := czero.API.Rest.getClusterInfraEnvID() - if err != nil { - return false, false, errors.Wrap(err, "Unable to retrieve clusterInfraEnvID from Agent Rest API") - } - czero.clusterInfraEnvID = clusterInfraEnvID - } +// MonitorStatusFromAssistedService (exit-on-error, returned-error) +// checks if the Assisted Service API is up, and both cluster and +// infraenv have been registered. +// +// After those preconditions are met, +// it then reports on the host validation status and overall cluster +// status and updates the cluster's install history. +// +// After cluster or host installation has started, new events from +// the Assisted Service API are also logged and updated to the cluster's +// install history. +func (czero *Cluster) MonitorStatusFromAssistedService() (bool, error) { + resource := "cluster" + logPrefix := "" + if czero.workflow == workflow.AgentWorkflowTypeAddNodes { + resource = "host" + logPrefix = fmt.Sprintf("Node %s: ", czero.API.Rest.NodeZeroIP) + } - // Getting cluster metadata from Agent Rest API - clusterMetadata, err := czero.GetClusterRestAPIMetadata() + // First time we see the agent Rest API + if !czero.installHistory.RestAPISeen { + logrus.Debugf("%sAgent Rest API Initialized", logPrefix) + czero.installHistory.RestAPISeen = true + czero.installHistory.NotReadyTime = time.Now() + } + + // Lazy loading of the clusterID and clusterInfraEnvID + if czero.clusterID == nil { + clusterID, err := czero.API.Rest.getClusterID() if err != nil { - return false, false, errors.Wrap(err, "Unable to retrieve cluster metadata from Agent Rest API") + return false, errors.Wrap(err, "Unable to retrieve clusterID from Agent Rest API") } + czero.clusterID = clusterID + } - if clusterMetadata == nil { - return false, false, errors.New("cluster metadata returned nil from Agent Rest API") + if czero.clusterInfraEnvID == nil { + clusterInfraEnvID, err := czero.API.Rest.getClusterInfraEnvID() + if err != nil { + return false, errors.Wrap(err, "Unable to retrieve clusterInfraEnvID from Agent Rest API") } + czero.clusterInfraEnvID = clusterInfraEnvID + } + + // Getting cluster metadata from Agent Rest API + clusterMetadata, err := czero.GetClusterRestAPIMetadata() + if err != nil { + return false, errors.Wrap(err, "Unable to retrieve cluster metadata from Agent Rest API") + } + + if clusterMetadata == nil { + return false, errors.New("cluster metadata returned nil from Agent Rest API") + } - czero.PrintInstallStatus(clusterMetadata) + czero.PrintInstallStatus(clusterMetadata) - // If status indicates pending action, log host info to help pinpoint what is missing - if (*clusterMetadata.Status != czero.installHistory.RestAPIPreviousClusterStatus) && - (*clusterMetadata.Status == models.ClusterStatusInstallingPendingUserAction) { - for _, host := range clusterMetadata.Hosts { - if *host.Status == models.ClusterStatusInstallingPendingUserAction { + // If status indicates pending action, log host info to help pinpoint what is missing + if (*clusterMetadata.Status != czero.installHistory.RestAPIPreviousClusterStatus) && + (*clusterMetadata.Status == models.ClusterStatusInstallingPendingUserAction) { + for _, host := range clusterMetadata.Hosts { + if *host.Status == models.ClusterStatusInstallingPendingUserAction { + if logPrefix != "" { + logrus.Warningf("%s%s %s", logPrefix, host.RequestedHostname, *host.StatusInfo) + } else { logrus.Warningf("Host %s %s", host.RequestedHostname, *host.StatusInfo) } } } + } - if *clusterMetadata.Status == models.ClusterStatusReady { - stuck, err := czero.IsClusterStuckInReady() - if err != nil { - return false, stuck, err - } - } else { - czero.installHistory.NotReadyTime = time.Now() + if *clusterMetadata.Status == models.ClusterStatusReady { + stuck, err := czero.IsClusterStuckInReady() + if err != nil { + return stuck, err } + } else { + czero.installHistory.NotReadyTime = time.Now() + } - czero.installHistory.RestAPIPreviousClusterStatus = *clusterMetadata.Status + czero.installHistory.RestAPIPreviousClusterStatus = *clusterMetadata.Status - installing, _ := czero.IsInstalling(*clusterMetadata.Status) - if !installing { - errored, _ := czero.HasErrored(*clusterMetadata.Status) - if errored { - return false, false, errors.New("cluster has stopped installing... working to recover installation") - } else if *clusterMetadata.Status == models.ClusterStatusCancelled { - return false, true, errors.New("cluster installation was cancelled") - } + installing, _ := czero.IsInstalling(*clusterMetadata.Status) + if !installing { + errored, _ := czero.HasErrored(*clusterMetadata.Status) + if errored { + return false, fmt.Errorf("%s has stopped installing... working to recover installation", resource) + } else if *clusterMetadata.Status == models.ClusterStatusCancelled { + return true, fmt.Errorf("%s installation was cancelled", resource) } + } - validationsErr := checkValidations(clusterMetadata, czero.installHistory.ValidationResults, logrus.StandardLogger()) - if validationsErr != nil { - return false, false, errors.Wrap(validationsErr, "cluster host validations failed") + validationsErr := checkValidations(clusterMetadata, czero.installHistory.ValidationResults, logrus.StandardLogger(), logPrefix) + if validationsErr != nil { + return false, errors.Wrap(validationsErr, "host validations failed") - } + } - // Print most recent event associated with the clusterInfraEnvID - eventList, err := czero.API.Rest.GetInfraEnvEvents(czero.clusterInfraEnvID) - if err != nil { - return false, false, errors.Wrap(err, "Unable to retrieve events about the cluster from the Agent Rest API") - } - if len(eventList) == 0 { - // No cluster events detected from the Agent Rest API - } else { - mostRecentEvent := eventList[len(eventList)-1] - // Don't print the same status message back to back - if *mostRecentEvent.Message != czero.installHistory.RestAPIPreviousEventMessage { - if *mostRecentEvent.Severity == models.EventSeverityInfo { - logrus.Info(*mostRecentEvent.Message) - } else { - logrus.Warn(*mostRecentEvent.Message) - } + // Print most recent event associated with the clusterInfraEnvID + eventList, err := czero.API.Rest.GetInfraEnvEvents(czero.clusterInfraEnvID) + if err != nil { + return false, errors.Wrap(err, fmt.Sprintf("Unable to retrieve events about the %s from the Agent Rest API", resource)) + } + if len(eventList) == 0 { + // No cluster events detected from the Agent Rest API + } else { + mostRecentEvent := eventList[len(eventList)-1] + // Don't print the same status message back to back + if *mostRecentEvent.Message != czero.installHistory.RestAPIPreviousEventMessage { + if *mostRecentEvent.Severity == models.EventSeverityInfo { + logrus.Infof("%s%s", logPrefix, *mostRecentEvent.Message) + } else { + logrus.Warnf("%s%s", logPrefix, *mostRecentEvent.Message) } - czero.installHistory.RestAPIPreviousEventMessage = *mostRecentEvent.Message - czero.installHistory.RestAPIInfraEnvEventList = eventList } - + czero.installHistory.RestAPIPreviousEventMessage = *mostRecentEvent.Message + czero.installHistory.RestAPIInfraEnvEventList = eventList } - - // cluster bootstrap is not complete - return false, false, nil + return false, nil } // IsInstallComplete Determine if the cluster has completed installation. @@ -429,15 +459,12 @@ func (czero *Cluster) PrintInstallationComplete() error { } // PrintInstallStatus Print a human friendly message using the models from the Agent Rest API. -func (czero *Cluster) PrintInstallStatus(cluster *models.Cluster) error { - - friendlyStatus := humanFriendlyClusterInstallStatus(*cluster.Status) +func (czero *Cluster) PrintInstallStatus(cluster *models.Cluster) { + friendlyStatus := czero.humanFriendlyClusterInstallStatus(*cluster.Status) // Don't print the same status message back to back if *cluster.Status != czero.installHistory.RestAPIPreviousClusterStatus { logrus.Info(friendlyStatus) } - - return nil } // CanSSHToNodeZero Checks if ssh to NodeZero succeeds. @@ -453,7 +480,7 @@ func (czero *Cluster) CanSSHToNodeZero() bool { } // Human friendly install status strings mapped to the Agent Rest API cluster statuses -func humanFriendlyClusterInstallStatus(status string) string { +func (czero *Cluster) humanFriendlyClusterInstallStatus(status string) string { clusterStoppedInstallingStates := map[string]string{ models.ClusterStatusAddingHosts: "Cluster is adding hosts", models.ClusterStatusCancelled: "Cluster installation cancelled", @@ -466,6 +493,10 @@ func humanFriendlyClusterInstallStatus(status string) string { models.ClusterStatusPreparingForInstallation: "Preparing cluster for installation", models.ClusterStatusReady: "Cluster is ready for install", } - return clusterStoppedInstallingStates[status] - + switch czero.workflow { + case workflow.AgentWorkflowTypeAddNodes: + return fmt.Sprintf("Node %s: %s", czero.API.Rest.NodeZeroIP, clusterStoppedInstallingStates[status]) + default: + return clusterStoppedInstallingStates[status] + } } diff --git a/pkg/agent/kube.go b/pkg/agent/kube.go index 09de24d4467..7530981fb42 100644 --- a/pkg/agent/kube.go +++ b/pkg/agent/kube.go @@ -2,12 +2,14 @@ package agent import ( "context" - "path/filepath" "github.com/pkg/errors" "github.com/sirupsen/logrus" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + certificatesv1 "k8s.io/api/certificates/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + certificatesClient "k8s.io/client-go/kubernetes/typed/certificates/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) @@ -15,17 +17,17 @@ import ( // ClusterKubeAPIClient is a kube client to interact with the cluster that agent installer is installing. type ClusterKubeAPIClient struct { Client *kubernetes.Clientset + csrClient certificatesClient.CertificateSigningRequestInterface ctx context.Context config *rest.Config configPath string } // NewClusterKubeAPIClient Create a new kube client to interact with the cluster under install. -func NewClusterKubeAPIClient(ctx context.Context, assetDir string) (*ClusterKubeAPIClient, error) { +func NewClusterKubeAPIClient(ctx context.Context, kubeconfigPath string) (*ClusterKubeAPIClient, error) { kubeClient := &ClusterKubeAPIClient{} - kubeconfigpath := filepath.Join(assetDir, "auth", "kubeconfig") - kubeconfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigpath) + kubeconfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) if err != nil { return nil, errors.Wrap(err, "error loading kubeconfig from assets") } @@ -35,10 +37,13 @@ func NewClusterKubeAPIClient(ctx context.Context, assetDir string) (*ClusterKube return nil, errors.Wrap(err, "creating a Kubernetes client from assets failed") } + csrClient := kubeclient.CertificatesV1().CertificateSigningRequests() + kubeClient.Client = kubeclient + kubeClient.csrClient = csrClient kubeClient.ctx = ctx kubeClient.config = kubeconfig - kubeClient.configPath = kubeconfigpath + kubeClient.configPath = kubeconfigPath return kubeClient, nil } @@ -62,7 +67,7 @@ func (kube *ClusterKubeAPIClient) DoesKubeConfigExist() (bool, error) { // IsBootstrapConfigMapComplete Detemine if the cluster's bootstrap configmap has the status complete. func (kube *ClusterKubeAPIClient) IsBootstrapConfigMapComplete() (bool, error) { // Get latest version of bootstrap configmap - bootstrap, err := kube.Client.CoreV1().ConfigMaps("kube-system").Get(kube.ctx, "bootstrap", v1.GetOptions{}) + bootstrap, err := kube.Client.CoreV1().ConfigMaps("kube-system").Get(kube.ctx, "bootstrap", metav1.GetOptions{}) if err != nil { // bootstrap configmap not found @@ -81,3 +86,21 @@ func (kube *ClusterKubeAPIClient) IsBootstrapConfigMapComplete() (bool, error) { } return false, nil } + +// ListNodes returns a list of nodes that have joined the cluster. +func (kube *ClusterKubeAPIClient) ListNodes() (*corev1.NodeList, error) { + nodeList, err := kube.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return &corev1.NodeList{}, err + } + return nodeList, nil +} + +// ListCSRs returns a list of this cluster's CSRs. +func (kube *ClusterKubeAPIClient) ListCSRs() (*certificatesv1.CertificateSigningRequestList, error) { + csrs, err := kube.csrClient.List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + return csrs, nil +} diff --git a/pkg/agent/monitoraddnodes.go b/pkg/agent/monitoraddnodes.go new file mode 100644 index 00000000000..8f8929225a2 --- /dev/null +++ b/pkg/agent/monitoraddnodes.go @@ -0,0 +1,321 @@ +package agent + +import ( + "context" + "crypto/x509" + "encoding/pem" + "fmt" + "net" + "net/http" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + certificatesv1 "k8s.io/api/certificates/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + firstCSRSignerName = "kubernetes.io/kube-apiserver-client-kubelet" + secondCSRSignerName = "kubernetes.io/kubelet-serving" +) + +type addNodeStatusHistory struct { + RestAPISeen bool + KubeletIsRunningOnNode bool + FirstCSRSeen bool + SecondCSRSeen bool + NodeJoinedCluster bool + NodeIsReady bool +} + +type addNodeMonitor struct { + nodeIPAddress string + hostnames []string + cluster *Cluster + status addNodeStatusHistory +} + +func newAddNodeMonitor(nodeIP string, cluster *Cluster) (*addNodeMonitor, error) { + parsedIPAddress := net.ParseIP(nodeIP) + if parsedIPAddress == nil { + return nil, fmt.Errorf("%s is not valid IP Address", nodeIP) + } + mon := addNodeMonitor{ + nodeIPAddress: parsedIPAddress.String(), + cluster: cluster, + status: addNodeStatusHistory{ + RestAPISeen: false, + KubeletIsRunningOnNode: false, + FirstCSRSeen: false, + SecondCSRSeen: false, + NodeJoinedCluster: false, + NodeIsReady: false, + }, + } + hostnames, err := net.LookupAddr(nodeIP) + if err != nil { + logrus.Infof("Cannot resolve IP address %v to a hostname. Skipping checks for pending CSRs.", nodeIP) + } else { + mon.hostnames = hostnames + } + return &mon, nil +} + +func (mon *addNodeMonitor) logStatus(status string) { + logrus.Infof("Node %s: %s", mon.nodeIPAddress, status) +} + +// MonitorAddNodes waits for the a node to be added to the cluster +// and reports its status until it becomes Ready. +func MonitorAddNodes(cluster *Cluster, nodeIPAddress string) error { + timeout := 90 * time.Minute + waitContext, cancel := context.WithTimeout(cluster.Ctx, timeout) + defer cancel() + + mon, err := newAddNodeMonitor(nodeIPAddress, cluster) + if err != nil { + return err + } + + wait.Until(func() { + if !mon.status.RestAPISeen && + mon.cluster.API.Rest.IsRestAPILive() { + mon.status.RestAPISeen = true + mon.logStatus("Assisted Service API is available") + } + + if !mon.status.KubeletIsRunningOnNode && + mon.isKubeletRunningOnNode() { + mon.status.KubeletIsRunningOnNode = true + mon.logStatus("Kubelet is running") + } + + if mon.status.KubeletIsRunningOnNode && + !mon.status.FirstCSRSeen && + mon.clusterHasFirstCSRPending() { + mon.status.FirstCSRSeen = true + mon.logStatus("First CSR Pending approval") + mon.logCSRsPendingApproval(firstCSRSignerName) + } + + if mon.status.KubeletIsRunningOnNode && + !mon.status.SecondCSRSeen && + mon.clusterHasSecondCSRPending() { + mon.status.SecondCSRSeen = true + mon.logStatus("Second CSR Pending approval") + mon.logCSRsPendingApproval(secondCSRSignerName) + } + + hasJoined, isReady, err := mon.nodeHasJoinedClusterAndIsReady() + if err != nil { + logrus.Debugf("nodeHasJoinedClusterAndIsReady returned err: %v", err) + } + + if !mon.status.NodeJoinedCluster && hasJoined { + mon.status.NodeJoinedCluster = true + mon.logStatus("Node joined cluster") + } + + if !mon.status.NodeIsReady && isReady { + mon.status.NodeIsReady = true + mon.logStatus("Node is Ready") + // TODO: There appears to be a bug where the node becomes Ready + // before second CSR is approved. Log Pending CSRs for now, so users + // are aware there are still some waiting their approval even + // though the node status is Ready. + mon.logCSRsPendingApproval(secondCSRSignerName) + cancel() + } + + if mon.cluster.API.Rest.IsRestAPILive() { + _, err = cluster.MonitorStatusFromAssistedService() + if err != nil { + logrus.Warnf("Node %s: %s", nodeIPAddress, err) + } + } + }, 5*time.Second, waitContext.Done()) + + waitErr := waitContext.Err() + if waitErr != nil { + if errors.Is(waitErr, context.Canceled) { + cancel() + } + if errors.Is(waitErr, context.DeadlineExceeded) { + return errors.Wrap(waitErr, "monitor-add-nodes process timed out") + } + } + + return nil +} + +func (mon *addNodeMonitor) nodeHasJoinedClusterAndIsReady() (bool, bool, error) { + nodes, err := mon.cluster.API.Kube.ListNodes() + if err != nil { + logrus.Debugf("error getting node list %v", err) + return false, false, nil + } + + var joinedNode corev1.Node + hasJoined := false + for _, node := range nodes.Items { + for _, address := range node.Status.Addresses { + if address.Type == corev1.NodeInternalIP { + if address.Address == mon.nodeIPAddress { + joinedNode = node + hasJoined = true + } + } + } + } + + isReady := false + if hasJoined { + logrus.Debugf("Node %v (%s) has joined cluster", mon.nodeIPAddress, joinedNode.Name) + for _, cond := range joinedNode.Status.Conditions { + if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue { + isReady = true + } + } + if isReady { + logrus.Debugf("Node %s (%s) is Ready", mon.nodeIPAddress, joinedNode.Name) + } else { + logrus.Debugf("Node %s (%s) is not Ready", mon.nodeIPAddress, joinedNode.Name) + } + } else { + logrus.Debugf("Node %s has not joined cluster", mon.nodeIPAddress) + } + + return hasJoined, isReady, nil +} + +func (mon *addNodeMonitor) logCSRsPendingApproval(signerName string) { + csrsPendingApproval := mon.getCSRsPendingApproval(signerName) + + for _, csr := range csrsPendingApproval { + mon.logStatus(fmt.Sprintf("CSR %s with signerName %s and username %s is Pending and awaiting approval", + csr.Name, csr.Spec.SignerName, csr.Spec.Username)) + } +} + +func (mon *addNodeMonitor) clusterHasFirstCSRPending() bool { + return len(mon.getCSRsPendingApproval(firstCSRSignerName)) > 0 +} + +func (mon *addNodeMonitor) clusterHasSecondCSRPending() bool { + return len(mon.getCSRsPendingApproval(secondCSRSignerName)) > 0 +} + +func (mon *addNodeMonitor) getCSRsPendingApproval(signerName string) []certificatesv1.CertificateSigningRequest { + if mon.hostnames == nil { + return []certificatesv1.CertificateSigningRequest{} + } + + csrs, err := mon.cluster.API.Kube.ListCSRs() + if err != nil { + logrus.Debugf("error calling listCSRs(): %v", err) + logrus.Infof("Cannot retrieve CSRs from Kube API. Skipping checks for pending CSRs") + return []certificatesv1.CertificateSigningRequest{} + } + + return filterCSRsMatchingHostname(signerName, csrs, mon.hostnames) +} + +func filterCSRsMatchingHostname(signerName string, csrs *certificatesv1.CertificateSigningRequestList, hostnames []string) []certificatesv1.CertificateSigningRequest { + matchedCSRs := []certificatesv1.CertificateSigningRequest{} + for _, csr := range csrs.Items { + if len(csr.Status.Conditions) > 0 { + // CSR is not Pending and not awaiting approval + continue + } + if signerName == firstCSRSignerName && csr.Spec.SignerName == firstCSRSignerName && + containsHostname(decodedFirstCSRSubject(csr.Spec.Request), hostnames) { + matchedCSRs = append(matchedCSRs, csr) + } + if signerName == secondCSRSignerName && csr.Spec.SignerName == secondCSRSignerName && + containsHostname(csr.Spec.Username, hostnames) { + matchedCSRs = append(matchedCSRs, csr) + } + } + return matchedCSRs +} + +// containsHostname checks if the searchString contains one of the node's +// hostnames. Only the first element of the hostname is checked. +// For example if the hostname is "extraworker-0.ostest.test.metalkube.org", +// "extraworker-0" is used to check if it exists in the searchString. +func containsHostname(searchString string, hostnames []string) bool { + for _, hostname := range hostnames { + parts := strings.Split(hostname, ".") + if strings.Contains(searchString, parts[0]) { + return true + } + } + return false +} + +// isKubeletRunningOnNode checks if kubelet responds +// to http. Even if kubelet responds with error like +// TLS errors, kubelet is considered running. +func (mon *addNodeMonitor) isKubeletRunningOnNode() bool { + url := fmt.Sprintf("https://%s:10250/metrics", mon.nodeIPAddress) + // http get without authentication + resp, err := http.Get(url) //nolint mon.nodeIPAddress is prevalidated to be IP address + if err != nil { + logrus.Debugf("kubelet http err: %v", err) + if strings.Contains(err.Error(), "remote error: tls: internal error") { + // nodes being added will return this error + return true + } + if strings.Contains(err.Error(), "tls: failed to verify certificate: x509: certificate signed by unknown authority") { + // existing control plane nodes returns this error + return true + } + if strings.Contains(err.Error(), "connect: no route to host") { + return false + } + } else { + logrus.Debugf("kubelet http status code: %v", resp.StatusCode) + } + return false +} + +// decodedFirstCSRSubject decodes the CSR.Spec.Request PEM block +// into readable output and returns the subject as string. +// +// Example of decoded request: +// Certificate Request: +// Data: +// Version: 1 (0x0) +// Subject: O = system:nodes, CN = system:node:extraworker-1 +// Subject Public Key Info: +// +// Public Key Algorithm: id-ecPublicKey +// Public-Key: (256 bit) +// pub: +// *snip* +// ASN1 OID: prime256v1 +// NIST CURVE: P-256 +// +// Attributes: +// +// a0:00 +// +// Signature Algorithm: ecdsa-with-SHA256 +// +// *snip* +func decodedFirstCSRSubject(request []byte) string { + block, _ := pem.Decode(request) + if block == nil { + return "" + } + csrDER := block.Bytes + decodedRequest, err := x509.ParseCertificateRequest(csrDER) + if err != nil { + logrus.Warn("error in x509.ParseCertificateRequest(csrDER)") + return "" + } + return decodedRequest.Subject.String() +} diff --git a/pkg/agent/monitoraddnodes_test.go b/pkg/agent/monitoraddnodes_test.go new file mode 100644 index 00000000000..0d118c544a2 --- /dev/null +++ b/pkg/agent/monitoraddnodes_test.go @@ -0,0 +1,161 @@ +package agent + +import ( + "testing" + + "github.com/stretchr/testify/assert" + certificatesv1 "k8s.io/api/certificates/v1" +) + +func TestDecodedFirstCSRSubjectContainsHostname(t *testing.T) { + firstCSRRequestForExtraworker0 := "-----BEGIN CERTIFICATE REQUEST-----\nMIH3MIGdAgEAMDsxFTATBgNVBAoTDHN5c3RlbTpub2RlczEiMCAGA1UEAxMZc3lz\ndGVtOm5vZGU6ZXh0cmF3b3JrZXItMDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IA\nBGaK3U+3X3lM6tdgjD2b/y7Kysws8xgFW1rNd/wvKEvXzP5+A1K1M38zJiAWqKXP\n5AL2IDklO4GaO7PcRDNPabigADAKBggqhkjOPQQDAgNJADBGAiEA7C33Nym0Go73\nCZY+XOmyqE/IhaBMSwign+fgbPX1ibkCIQDHIfF7QpZReF93IW0v864/yLoXKyXy\nTGygkuR4KtXTDw==\n-----END CERTIFICATE REQUEST-----\n" + tests := []struct { + name string + hostnames []string + request string + expectedResult bool + }{ + { + name: "request contains hostname", + hostnames: []string{"extraworker-0"}, + request: firstCSRRequestForExtraworker0, + expectedResult: true, + }, + { + name: "request contains hostname using FQDN", + hostnames: []string{"extraworker-0.ostest.test.metalkube.org"}, + request: firstCSRRequestForExtraworker0, + expectedResult: true, + }, + { + name: "request contains hostname when multiple names are resolved", + hostnames: []string{"somename", "extraworker-0.ostest.test.metalkube.org"}, + request: firstCSRRequestForExtraworker0, + expectedResult: true, + }, + { + name: "request does not contain hostname", + hostnames: []string{"extraworker-1"}, + request: firstCSRRequestForExtraworker0, + expectedResult: false, + }, + { + name: "request is empty string", + hostnames: []string{"hostname-not-specified"}, + request: "", + expectedResult: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + containsHostname := containsHostname(decodedFirstCSRSubject([]byte(tt.request)), tt.hostnames) + assert.Equal(t, tt.expectedResult, containsHostname) + }) + } +} + +func TestFilterCSRsMatchingHostnames(t *testing.T) { + firstCSRRequestForExtraworker0 := "-----BEGIN CERTIFICATE REQUEST-----\nMIH3MIGdAgEAMDsxFTATBgNVBAoTDHN5c3RlbTpub2RlczEiMCAGA1UEAxMZc3lz\ndGVtOm5vZGU6ZXh0cmF3b3JrZXItMDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IA\nBGaK3U+3X3lM6tdgjD2b/y7Kysws8xgFW1rNd/wvKEvXzP5+A1K1M38zJiAWqKXP\n5AL2IDklO4GaO7PcRDNPabigADAKBggqhkjOPQQDAgNJADBGAiEA7C33Nym0Go73\nCZY+XOmyqE/IhaBMSwign+fgbPX1ibkCIQDHIfF7QpZReF93IW0v864/yLoXKyXy\nTGygkuR4KtXTDw==\n-----END CERTIFICATE REQUEST-----\n" + + tests := []struct { + name string + csrs *certificatesv1.CertificateSigningRequestList + hostnames []string + signerName string + expectedResult []certificatesv1.CertificateSigningRequest + }{ + { + name: "first CSR filtering", + csrs: &certificatesv1.CertificateSigningRequestList{ + Items: []certificatesv1.CertificateSigningRequest{ + { + // should match only this one + Spec: certificatesv1.CertificateSigningRequestSpec{ + SignerName: firstCSRSignerName, + Request: []byte(firstCSRRequestForExtraworker0), + }, + }, + { + Spec: certificatesv1.CertificateSigningRequestSpec{ + SignerName: "other-request", + Request: []byte("other-request"), + }, + }, + }, + }, + hostnames: []string{"extraworker-0.ostest.test.metalkube.org"}, + signerName: "kubernetes.io/kube-apiserver-client-kubelet", + expectedResult: []certificatesv1.CertificateSigningRequest{ + { + Spec: certificatesv1.CertificateSigningRequestSpec{ + SignerName: "kubernetes.io/kube-apiserver-client-kubelet", + Request: []byte(firstCSRRequestForExtraworker0), + }, + }, + }, + }, + { + name: "second CSR filtering", + csrs: &certificatesv1.CertificateSigningRequestList{ + Items: []certificatesv1.CertificateSigningRequest{ + { + // should match only this one + Spec: certificatesv1.CertificateSigningRequestSpec{ + SignerName: secondCSRSignerName, + Username: "system:node:extraworker-0", + Request: []byte("something"), + }, + }, + { + Spec: certificatesv1.CertificateSigningRequestSpec{ + SignerName: secondCSRSignerName, + Username: "system:node:extraworker-1", + Request: []byte("something"), + }, + }, + { + Spec: certificatesv1.CertificateSigningRequestSpec{ + SignerName: "other-request", + Request: []byte("other-request"), + }, + }, + }, + }, + hostnames: []string{"extraworker-0.ostest.test.metalkube.org"}, + signerName: secondCSRSignerName, + expectedResult: []certificatesv1.CertificateSigningRequest{ + { + Spec: certificatesv1.CertificateSigningRequestSpec{ + SignerName: "kubernetes.io/kubelet-serving", + Username: "system:node:extraworker-0", + Request: []byte("something"), + }, + }, + }, + }, + { + name: "no CSRs should not result in error", + csrs: &certificatesv1.CertificateSigningRequestList{ + Items: []certificatesv1.CertificateSigningRequest{}, + }, + hostnames: []string{"extraworker-0.ostest.test.metalkube.org"}, + signerName: secondCSRSignerName, + expectedResult: []certificatesv1.CertificateSigningRequest{}, + }, + { + name: "no hostnames should not result in error", + csrs: &certificatesv1.CertificateSigningRequestList{ + Items: []certificatesv1.CertificateSigningRequest{}, + }, + hostnames: []string{}, + signerName: secondCSRSignerName, + expectedResult: []certificatesv1.CertificateSigningRequest{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filteredCSRs := filterCSRsMatchingHostname(tt.signerName, tt.csrs, tt.hostnames) + assert.Equal(t, tt.expectedResult, filteredCSRs) + }) + } +} diff --git a/pkg/agent/ocp.go b/pkg/agent/ocp.go index 40016c2de4b..71631121805 100644 --- a/pkg/agent/ocp.go +++ b/pkg/agent/ocp.go @@ -2,7 +2,6 @@ package agent import ( "context" - "path/filepath" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -34,12 +33,10 @@ const ( ) // NewClusterOpenShiftAPIClient Create a kube client with OCP understanding -func NewClusterOpenShiftAPIClient(ctx context.Context, assetDir string) (*ClusterOpenShiftAPIClient, error) { - +func NewClusterOpenShiftAPIClient(ctx context.Context, kubeconfigPath string) (*ClusterOpenShiftAPIClient, error) { ocpClient := &ClusterOpenShiftAPIClient{} - kubeconfigpath := filepath.Join(assetDir, "auth", "kubeconfig") - kubeconfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigpath) + kubeconfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) if err != nil { return nil, errors.Wrap(err, "creating kubeconfig for ocp config client") } @@ -58,7 +55,7 @@ func NewClusterOpenShiftAPIClient(ctx context.Context, assetDir string) (*Cluste ocpClient.RouteClient = routeClient ocpClient.ctx = ctx ocpClient.config = kubeconfig - ocpClient.configPath = kubeconfigpath + ocpClient.configPath = kubeconfigPath return ocpClient, nil diff --git a/pkg/agent/rest.go b/pkg/agent/rest.go index 015bcc44629..69e08a3c3c5 100644 --- a/pkg/agent/rest.go +++ b/pkg/agent/rest.go @@ -32,8 +32,32 @@ type NodeZeroRestClient struct { } // NewNodeZeroRestClient Initialize a new rest client to interact with the Agent Rest API on node zero. -func NewNodeZeroRestClient(ctx context.Context, assetDir string) (*NodeZeroRestClient, error) { +func NewNodeZeroRestClient(ctx context.Context, rendezvousIP string, sshKey string) (*NodeZeroRestClient, error) { restClient := &NodeZeroRestClient{} + + // Get SSH Keys which can be used to determine if Rest API failures are due to network connectivity issues + if sshKey != "" { + restClient.NodeSSHKey = append(restClient.NodeSSHKey, sshKey) + } + + config := client.Config{} + config.URL = &url.URL{ + Scheme: "http", + Host: net.JoinHostPort(rendezvousIP, "8090"), + Path: client.DefaultBasePath, + } + client := client.New(config) + + restClient.Client = client + restClient.ctx = ctx + restClient.config = config + restClient.NodeZeroIP = rendezvousIP + + return restClient, nil +} + +// FindRendezvouIPAndSSHKeyFromAssetStore returns the rendezvousIP and public ssh key. +func FindRendezvouIPAndSSHKeyFromAssetStore(assetDir string) (string, string, error) { agentConfigAsset := &agentconfig.AgentConfig{} agentManifestsAsset := &manifests.AgentManifests{} installConfigAsset := &installconfig.InstallConfig{} @@ -41,7 +65,7 @@ func NewNodeZeroRestClient(ctx context.Context, assetDir string) (*NodeZeroRestC assetStore, err := assetstore.NewStore(assetDir) if err != nil { - return nil, errors.Wrap(err, "failed to create asset store") + return "", "", errors.Wrap(err, "failed to create asset store") } agentConfig, agentConfigError := assetStore.Load(agentConfigAsset) @@ -62,45 +86,33 @@ func NewNodeZeroRestClient(ctx context.Context, assetDir string) (*NodeZeroRestC logrus.Debug(errors.Wrapf(agentConfigError, "failed to load %s", agentHostsAsset.Name())) } if agentConfigError != nil || manifestError != nil || installConfigError != nil || agentHostsError != nil { - return nil, errors.New("failed to load AgentConfig, NMStateConfig, InstallConfig, or AgentHosts") + return "", "", errors.New("failed to load AgentConfig, NMStateConfig, InstallConfig, or AgentHosts") } - var RendezvousIP string + var rendezvousIP string var rendezvousIPError error var emptyNMStateConfigs []*v1beta1.NMStateConfig if agentConfig != nil && agentManifests != nil { - RendezvousIP, rendezvousIPError = image.RetrieveRendezvousIP(agentConfig.(*agentconfig.AgentConfig).Config, agentHosts.(*agentconfig.AgentHosts).Hosts, agentManifests.(*manifests.AgentManifests).NMStateConfigs) + rendezvousIP, rendezvousIPError = image.RetrieveRendezvousIP(agentConfig.(*agentconfig.AgentConfig).Config, agentHosts.(*agentconfig.AgentHosts).Hosts, agentManifests.(*manifests.AgentManifests).NMStateConfigs) } else if agentConfig == nil && agentManifests != nil { - RendezvousIP, rendezvousIPError = image.RetrieveRendezvousIP(&agent.Config{}, agentHosts.(*agentconfig.AgentHosts).Hosts, agentManifests.(*manifests.AgentManifests).NMStateConfigs) + rendezvousIP, rendezvousIPError = image.RetrieveRendezvousIP(&agent.Config{}, agentHosts.(*agentconfig.AgentHosts).Hosts, agentManifests.(*manifests.AgentManifests).NMStateConfigs) } else if agentConfig != nil && agentManifests == nil { - RendezvousIP, rendezvousIPError = image.RetrieveRendezvousIP(agentConfig.(*agentconfig.AgentConfig).Config, agentHosts.(*agentconfig.AgentHosts).Hosts, emptyNMStateConfigs) + rendezvousIP, rendezvousIPError = image.RetrieveRendezvousIP(agentConfig.(*agentconfig.AgentConfig).Config, agentHosts.(*agentconfig.AgentHosts).Hosts, emptyNMStateConfigs) } else { - return nil, errors.New("both AgentConfig and NMStateConfig are empty") + return "", "", errors.New("both AgentConfig and NMStateConfig are empty") } if rendezvousIPError != nil { - return nil, rendezvousIPError + return "", "", rendezvousIPError } + var sshKey string // Get SSH Keys which can be used to determine if Rest API failures are due to network connectivity issues if installConfig != nil { - restClient.NodeSSHKey = append(restClient.NodeSSHKey, installConfig.(*installconfig.InstallConfig).Config.SSHKey) - } - - config := client.Config{} - config.URL = &url.URL{ - Scheme: "http", - Host: net.JoinHostPort(RendezvousIP, "8090"), - Path: client.DefaultBasePath, + sshKey = installConfig.(*installconfig.InstallConfig).Config.SSHKey } - client := client.New(config) - - restClient.Client = client - restClient.ctx = ctx - restClient.config = config - restClient.NodeZeroIP = RendezvousIP - return restClient, nil + return rendezvousIP, sshKey, nil } // IsRestAPILive Determine if the Agent Rest API on node zero has initialized @@ -108,10 +120,7 @@ func (rest *NodeZeroRestClient) IsRestAPILive() bool { // GET /v2/infraenvs listInfraEnvsParams := installer.NewListInfraEnvsParams() _, err := rest.Client.Installer.ListInfraEnvs(rest.ctx, listInfraEnvsParams) - if err != nil { - return false - } - return true + return err == nil } // GetRestAPIServiceBaseURL Return the url of the Agent Rest API on node zero diff --git a/pkg/agent/validations.go b/pkg/agent/validations.go index b123665b6b4..2df07cefc17 100644 --- a/pkg/agent/validations.go +++ b/pkg/agent/validations.go @@ -34,7 +34,7 @@ type validationResultHistory struct { previousMessage string } -func checkValidations(cluster *models.Cluster, validationResults *validationResults, log *logrus.Logger) error { +func checkValidations(cluster *models.Cluster, validationResults *validationResults, log *logrus.Logger, hostLogPrefix string) error { clusterLogPrefix := "Cluster validation: " updatedClusterValidationHistory, err := updateValidationResultHistory(clusterLogPrefix, cluster.ValidationsInfo, validationResults.ClusterValidationHistory, log) if err != nil { @@ -43,7 +43,9 @@ func checkValidations(cluster *models.Cluster, validationResults *validationResu validationResults.ClusterValidationHistory = updatedClusterValidationHistory for _, h := range cluster.Hosts { - hostLogPrefix := "Host " + h.RequestedHostname + " validation: " + if hostLogPrefix == "" { + hostLogPrefix = "Host " + h.RequestedHostname + " validation: " + } if _, ok := validationResults.HostValidationHistory[h.RequestedHostname]; !ok { validationResults.HostValidationHistory[h.RequestedHostname] = make(map[string]*validationResultHistory) } diff --git a/pkg/nodejoiner/addnodes.go b/pkg/nodejoiner/addnodes.go index 62c0d10563c..4cd156a3b9c 100644 --- a/pkg/nodejoiner/addnodes.go +++ b/pkg/nodejoiner/addnodes.go @@ -18,12 +18,7 @@ const ( // NewAddNodesCommand creates a new command for add nodes. func NewAddNodesCommand(directory string, kubeConfig string) error { - // Store the current parameters into the assets folder, so - // that they could be retrieved later by the assets - params := joiner.Params{ - Kubeconfig: kubeConfig, - } - err := params.Save(directory) + err := saveParams(directory, kubeConfig) if err != nil { return err } @@ -45,3 +40,12 @@ func NewAddNodesCommand(directory string, kubeConfig string) error { return err } + +func saveParams(directory, kubeConfig string) error { + // Store the current parameters into the assets folder, so + // that they could be retrieved later by the assets + params := joiner.Params{ + Kubeconfig: kubeConfig, + } + return params.Save(directory) +} diff --git a/pkg/nodejoiner/monitoraddnodes.go b/pkg/nodejoiner/monitoraddnodes.go index b6219f032fb..fab9febf3eb 100644 --- a/pkg/nodejoiner/monitoraddnodes.go +++ b/pkg/nodejoiner/monitoraddnodes.go @@ -3,12 +3,28 @@ package nodejoiner import ( "context" - "github.com/openshift/installer/pkg/asset" - "github.com/openshift/installer/pkg/asset/store" + "github.com/sirupsen/logrus" + + agentpkg "github.com/openshift/installer/pkg/agent" + "github.com/openshift/installer/pkg/asset/agent/workflow" ) // NewMonitorAddNodesCommand creates a new command for monitor add nodes. -func NewMonitorAddNodesCommand(directory string) error { - fetcher := store.NewAssetsFetcher(directory) - return fetcher.FetchAndPersist(context.Background(), []asset.WritableAsset{}) +func NewMonitorAddNodesCommand(directory, kubeconfigPath string, ips []string) error { + err := saveParams(directory, kubeconfigPath) + if err != nil { + return err + } + + cluster, err := agentpkg.NewCluster(context.Background(), "", ips[0], kubeconfigPath, "", workflow.AgentWorkflowTypeAddNodes) + if err != nil { + // TODO exit code enumerate + logrus.Exit(1) + } + + if err != nil { + return err + } + + return agentpkg.MonitorAddNodes(cluster, ips[0]) }