From eda1c76fe5833c990edefcea9fc3521fbf5d3dac Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Sat, 1 Feb 2020 13:03:49 -0300 Subject: [PATCH 01/14] add kic tunnel (alpha) --- cmd/minikube/cmd/tunnel.go | 28 ++++++++++-- pkg/minikube/tunnel/kic/tunnel.go | 71 +++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 4 deletions(-) create mode 100644 pkg/minikube/tunnel/kic/tunnel.go diff --git a/cmd/minikube/cmd/tunnel.go b/cmd/minikube/cmd/tunnel.go index 8fdcae40a6e1..7639bdfacb19 100644 --- a/cmd/minikube/cmd/tunnel.go +++ b/cmd/minikube/cmd/tunnel.go @@ -20,16 +20,23 @@ import ( "context" "os" "os/signal" + "path/filepath" + "runtime" + "strconv" "time" "github.com/golang/glog" "github.com/spf13/cobra" "github.com/spf13/viper" + + "k8s.io/minikube/pkg/drivers/kic/oci" "k8s.io/minikube/pkg/minikube/config" "k8s.io/minikube/pkg/minikube/exit" + "k8s.io/minikube/pkg/minikube/localpath" "k8s.io/minikube/pkg/minikube/machine" "k8s.io/minikube/pkg/minikube/service" "k8s.io/minikube/pkg/minikube/tunnel" + "k8s.io/minikube/pkg/minikube/tunnel/kic" ) var cleanup bool @@ -69,6 +76,23 @@ var tunnelCmd = &cobra.Command{ exit.WithError("error creating clientset", err) } + cfg, err := config.Load(viper.GetString(config.MachineProfile)) + if err != nil { + exit.WithError("Error getting config", err) + } + + if runtime.GOOS == "darwin" && cfg.VMDriver == "docker" { + // do not ignore error + port, _ := oci.HostPortBinding("docker", "minikube", 22) + sshPort := strconv.Itoa(port) + sshKey := filepath.Join(localpath.MiniPath(), "machines", cfg.Name, "id_rsa") + + kicTunnel := kic.NewTunnel(sshPort, sshKey, clientset.CoreV1()) + kicTunnel.Start() + + return + } + ctrlC := make(chan os.Signal, 1) signal.Notify(ctrlC, os.Interrupt) ctx, cancel := context.WithCancel(context.Background()) @@ -77,10 +101,6 @@ var tunnelCmd = &cobra.Command{ cancel() }() - cfg, err := config.Load(viper.GetString(config.MachineProfile)) - if err != nil { - exit.WithError("Error getting config", err) - } done, err := manager.StartTunnel(ctx, cfg.Name, api, config.DefaultLoader, clientset.CoreV1()) if err != nil { exit.WithError("error starting tunnel", err) diff --git a/pkg/minikube/tunnel/kic/tunnel.go b/pkg/minikube/tunnel/kic/tunnel.go new file mode 100644 index 000000000000..d538e4fd8a3c --- /dev/null +++ b/pkg/minikube/tunnel/kic/tunnel.go @@ -0,0 +1,71 @@ +package kic + +import ( + "fmt" + "os/exec" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + typed_core "k8s.io/client-go/kubernetes/typed/core/v1" +) + +// Tunnel ... +type Tunnel struct { + sshPort string + sshKey string + v1Core typed_core.CoreV1Interface +} + +// NewTunnel ... +func NewTunnel(sshPort, sshKey string, v1Core typed_core.CoreV1Interface) *Tunnel { + return &Tunnel{ + sshPort: sshPort, + sshKey: sshKey, + v1Core: v1Core, + } +} + +// Start ... +func (t *Tunnel) Start() error { + for { + services, err := t.v1Core.Services("").List(metav1.ListOptions{}) + if err != nil { + // do I return error, or log and continue? + return err + } + + for _, s := range services.Items { + if s.Spec.Type == v1.ServiceTypeLoadBalancer { + t.createTunnel(s.Name, s.Spec.ClusterIP, s.Spec.Ports) + } + } + + // which time to use? + time.Sleep(10 * time.Second) + } +} + +func (t *Tunnel) createTunnel(name, clusterIP string, ports []v1.ServicePort) { + sshArgs := []string{ + "-N", + "docker@127.0.0.1", + "-p", t.sshPort, + "-i", t.sshKey, + } + + for _, port := range ports { + arg := fmt.Sprintf( + "-L %d:%s:%d", + port.Port, + clusterIP, + port.Port, + ) + + sshArgs = append(sshArgs, arg) + } + + cmd := exec.Command("ssh", sshArgs...) + err := cmd.Run() + fmt.Println(err) +} From 0dfa069fd1e8c98b98fa53e040754ee9391bd0d0 Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Wed, 5 Feb 2020 12:45:18 -0300 Subject: [PATCH 02/14] add support to multiple services - kic tunnel --- pkg/minikube/tunnel/kic/tunnel.go | 70 ++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 6 deletions(-) diff --git a/pkg/minikube/tunnel/kic/tunnel.go b/pkg/minikube/tunnel/kic/tunnel.go index d538e4fd8a3c..2ee7044611d1 100644 --- a/pkg/minikube/tunnel/kic/tunnel.go +++ b/pkg/minikube/tunnel/kic/tunnel.go @@ -17,6 +17,12 @@ type Tunnel struct { v1Core typed_core.CoreV1Interface } +type sshTunnel struct { + name string + kill bool + cmd *exec.Cmd +} + // NewTunnel ... func NewTunnel(sshPort, sshKey string, v1Core typed_core.CoreV1Interface) *Tunnel { return &Tunnel{ @@ -28,25 +34,46 @@ func NewTunnel(sshPort, sshKey string, v1Core typed_core.CoreV1Interface) *Tunne // Start ... func (t *Tunnel) Start() error { + sshTunnels := make(map[string]*sshTunnel) + for { services, err := t.v1Core.Services("").List(metav1.ListOptions{}) if err != nil { - // do I return error, or log and continue? return err } + for _, v := range sshTunnels { + v.kill = true + } + for _, s := range services.Items { if s.Spec.Type == v1.ServiceTypeLoadBalancer { - t.createTunnel(s.Name, s.Spec.ClusterIP, s.Spec.Ports) + sshTunnel, ok := sshTunnels[s.Name] + + if ok { + sshTunnel.kill = false + continue + } + + newSSHTunnel := t.createSSHTunnel(s.Name, s.Spec.ClusterIP, s.Spec.Ports) + sshTunnels[newSSHTunnel.name] = newSSHTunnel + } + } + + for _, v := range sshTunnels { + if v.kill { + v.stop() + delete(sshTunnels, v.name) } } // which time to use? - time.Sleep(10 * time.Second) + time.Sleep(1 * time.Second) } } -func (t *Tunnel) createTunnel(name, clusterIP string, ports []v1.ServicePort) { +func (t *Tunnel) createSSHTunnel(name, clusterIP string, ports []v1.ServicePort) *sshTunnel { + // extract sshArgs sshArgs := []string{ "-N", "docker@127.0.0.1", @@ -66,6 +93,37 @@ func (t *Tunnel) createTunnel(name, clusterIP string, ports []v1.ServicePort) { } cmd := exec.Command("ssh", sshArgs...) - err := cmd.Run() - fmt.Println(err) + + // TODO: name must be different, because if a service was changed, + // we must remove the old process and create the new one + s := &sshTunnel{ + name: fmt.Sprintf("%s", name), + kill: false, + cmd: cmd, + } + + go s.run() + + return s +} + +func (s *sshTunnel) run() { + fmt.Println("running", s.name) + err := s.cmd.Start() + if err != nil { + // TODO: improve logging + fmt.Println(err) + } + + // we are ignoring wait return, because the process will be killed, once the tunnel is not needed. + s.cmd.Wait() +} + +func (s *sshTunnel) stop() { + fmt.Println("stopping", s.name) + err := s.cmd.Process.Kill() + if err != nil { + // TODO: improve logging + fmt.Println(err) + } } From 60c6116f2c28b48e97d0b1ce05602a26d745a0fc Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Wed, 5 Feb 2020 13:58:38 -0300 Subject: [PATCH 03/14] Extract sshTunnel - kic tunnel --- pkg/minikube/tunnel/kic/sshTunnel.go | 70 +++++++++++++++++++++++++++ pkg/minikube/tunnel/kic/tunnel.go | 72 ++-------------------------- 2 files changed, 74 insertions(+), 68 deletions(-) create mode 100644 pkg/minikube/tunnel/kic/sshTunnel.go diff --git a/pkg/minikube/tunnel/kic/sshTunnel.go b/pkg/minikube/tunnel/kic/sshTunnel.go new file mode 100644 index 000000000000..cdf5814bffe6 --- /dev/null +++ b/pkg/minikube/tunnel/kic/sshTunnel.go @@ -0,0 +1,70 @@ +package kic + +import ( + "fmt" + "os/exec" + + v1 "k8s.io/api/core/v1" +) + +type sshTunnel struct { + name string + sigkill bool + cmd *exec.Cmd +} + +func createSSHTunnel(t *Tunnel, name, clusterIP string, ports []v1.ServicePort) *sshTunnel { + // extract sshArgs + sshArgs := []string{ + "-N", + "docker@127.0.0.1", + "-p", t.sshPort, + "-i", t.sshKey, + } + + for _, port := range ports { + arg := fmt.Sprintf( + "-L %d:%s:%d", + port.Port, + clusterIP, + port.Port, + ) + + sshArgs = append(sshArgs, arg) + } + + cmd := exec.Command("ssh", sshArgs...) + + // TODO: name must be different, because if a service was changed, + // we must remove the old process and create the new one + s := &sshTunnel{ + name: name, + sigkill: false, + cmd: cmd, + } + + go s.run() + + return s +} + +func (s *sshTunnel) run() { + fmt.Println("running", s.name) + err := s.cmd.Start() + if err != nil { + // TODO: improve logging + fmt.Println(err) + } + + // we are ignoring wait return, because the process will be killed, once the tunnel is not needed. + s.cmd.Wait() +} + +func (s *sshTunnel) stop() { + fmt.Println("stopping", s.name) + err := s.cmd.Process.Kill() + if err != nil { + // TODO: improve logging + fmt.Println(err) + } +} diff --git a/pkg/minikube/tunnel/kic/tunnel.go b/pkg/minikube/tunnel/kic/tunnel.go index 2ee7044611d1..d9a363d30862 100644 --- a/pkg/minikube/tunnel/kic/tunnel.go +++ b/pkg/minikube/tunnel/kic/tunnel.go @@ -1,8 +1,6 @@ package kic import ( - "fmt" - "os/exec" "time" v1 "k8s.io/api/core/v1" @@ -17,12 +15,6 @@ type Tunnel struct { v1Core typed_core.CoreV1Interface } -type sshTunnel struct { - name string - kill bool - cmd *exec.Cmd -} - // NewTunnel ... func NewTunnel(sshPort, sshKey string, v1Core typed_core.CoreV1Interface) *Tunnel { return &Tunnel{ @@ -43,7 +35,7 @@ func (t *Tunnel) Start() error { } for _, v := range sshTunnels { - v.kill = true + v.sigkill = true } for _, s := range services.Items { @@ -51,17 +43,17 @@ func (t *Tunnel) Start() error { sshTunnel, ok := sshTunnels[s.Name] if ok { - sshTunnel.kill = false + sshTunnel.sigkill = false continue } - newSSHTunnel := t.createSSHTunnel(s.Name, s.Spec.ClusterIP, s.Spec.Ports) + newSSHTunnel := createSSHTunnel(t, s.Name, s.Spec.ClusterIP, s.Spec.Ports) sshTunnels[newSSHTunnel.name] = newSSHTunnel } } for _, v := range sshTunnels { - if v.kill { + if v.sigkill { v.stop() delete(sshTunnels, v.name) } @@ -71,59 +63,3 @@ func (t *Tunnel) Start() error { time.Sleep(1 * time.Second) } } - -func (t *Tunnel) createSSHTunnel(name, clusterIP string, ports []v1.ServicePort) *sshTunnel { - // extract sshArgs - sshArgs := []string{ - "-N", - "docker@127.0.0.1", - "-p", t.sshPort, - "-i", t.sshKey, - } - - for _, port := range ports { - arg := fmt.Sprintf( - "-L %d:%s:%d", - port.Port, - clusterIP, - port.Port, - ) - - sshArgs = append(sshArgs, arg) - } - - cmd := exec.Command("ssh", sshArgs...) - - // TODO: name must be different, because if a service was changed, - // we must remove the old process and create the new one - s := &sshTunnel{ - name: fmt.Sprintf("%s", name), - kill: false, - cmd: cmd, - } - - go s.run() - - return s -} - -func (s *sshTunnel) run() { - fmt.Println("running", s.name) - err := s.cmd.Start() - if err != nil { - // TODO: improve logging - fmt.Println(err) - } - - // we are ignoring wait return, because the process will be killed, once the tunnel is not needed. - s.cmd.Wait() -} - -func (s *sshTunnel) stop() { - fmt.Println("stopping", s.name) - err := s.cmd.Process.Kill() - if err != nil { - // TODO: improve logging - fmt.Println(err) - } -} From e0bccd044c878791ff38ea88404e0a72e9067526 Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Wed, 5 Feb 2020 14:22:15 -0300 Subject: [PATCH 04/14] Add uniq tunnel name - kic tunnel --- pkg/minikube/tunnel/kic/tunnel.go | 40 +++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/pkg/minikube/tunnel/kic/tunnel.go b/pkg/minikube/tunnel/kic/tunnel.go index d9a363d30862..1ae11f1d6595 100644 --- a/pkg/minikube/tunnel/kic/tunnel.go +++ b/pkg/minikube/tunnel/kic/tunnel.go @@ -1,6 +1,8 @@ package kic import ( + "fmt" + "strings" "time" v1 "k8s.io/api/core/v1" @@ -34,32 +36,50 @@ func (t *Tunnel) Start() error { return err } - for _, v := range sshTunnels { - v.sigkill = true + for _, sshTunnel := range sshTunnels { + sshTunnel.sigkill = true } for _, s := range services.Items { if s.Spec.Type == v1.ServiceTypeLoadBalancer { - sshTunnel, ok := sshTunnels[s.Name] + name := sshTunnelName(s.Name, s.Spec.ClusterIP, s.Spec.Ports) + existingSSHTunnel, ok := sshTunnels[name] if ok { - sshTunnel.sigkill = false + existingSSHTunnel.sigkill = false continue } - newSSHTunnel := createSSHTunnel(t, s.Name, s.Spec.ClusterIP, s.Spec.Ports) + newSSHTunnel := createSSHTunnel(t, name, s.Spec.ClusterIP, s.Spec.Ports) sshTunnels[newSSHTunnel.name] = newSSHTunnel } } - for _, v := range sshTunnels { - if v.sigkill { - v.stop() - delete(sshTunnels, v.name) + for _, sshTunnel := range sshTunnels { + if sshTunnel.sigkill { + sshTunnel.stop() + delete(sshTunnels, sshTunnel.name) } } - // which time to use? + // TODO: which time to use? time.Sleep(1 * time.Second) } } + +// sshTunnelName creaets a uniq name for the tunnel, using its name/clusterIP/ports. +// This allows a new process to be created if an existing service was changed, +// the new process will support the IP/Ports change ocurred. +func sshTunnelName(name, clusterIP string, ports []v1.ServicePort) string { + n := []string{ + name, + "-", + clusterIP, + } + + for _, port := range ports { + n = append(n, fmt.Sprintf("-%d", port.Port)) + } + + return strings.Join(n, "") +} From 70c0e35c443b34bdb524d9ec1245a1c8c1e2ae16 Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Wed, 5 Feb 2020 16:24:23 -0300 Subject: [PATCH 05/14] Refactor sshTunnelName --- pkg/minikube/tunnel/kic/tunnel.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/minikube/tunnel/kic/tunnel.go b/pkg/minikube/tunnel/kic/tunnel.go index 1ae11f1d6595..0049cedc4831 100644 --- a/pkg/minikube/tunnel/kic/tunnel.go +++ b/pkg/minikube/tunnel/kic/tunnel.go @@ -42,7 +42,7 @@ func (t *Tunnel) Start() error { for _, s := range services.Items { if s.Spec.Type == v1.ServiceTypeLoadBalancer { - name := sshTunnelName(s.Name, s.Spec.ClusterIP, s.Spec.Ports) + name := sshTunnelName(s) existingSSHTunnel, ok := sshTunnels[name] if ok { @@ -70,14 +70,14 @@ func (t *Tunnel) Start() error { // sshTunnelName creaets a uniq name for the tunnel, using its name/clusterIP/ports. // This allows a new process to be created if an existing service was changed, // the new process will support the IP/Ports change ocurred. -func sshTunnelName(name, clusterIP string, ports []v1.ServicePort) string { +func sshTunnelName(service v1.Service) string { n := []string{ - name, + service.Name, "-", - clusterIP, + service.Spec.ClusterIP, } - for _, port := range ports { + for _, port := range service.Spec.Ports { n = append(n, fmt.Sprintf("-%d", port.Port)) } From 649d5c0f3f17804f8ae03b6b0ef11bf9fb8af882 Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Wed, 5 Feb 2020 16:36:54 -0300 Subject: [PATCH 06/14] refactor createSSHTunnel --- pkg/minikube/tunnel/kic/sshTunnel.go | 16 ++++++++++------ pkg/minikube/tunnel/kic/tunnel.go | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/pkg/minikube/tunnel/kic/sshTunnel.go b/pkg/minikube/tunnel/kic/sshTunnel.go index cdf5814bffe6..02e51822ca3e 100644 --- a/pkg/minikube/tunnel/kic/sshTunnel.go +++ b/pkg/minikube/tunnel/kic/sshTunnel.go @@ -13,20 +13,20 @@ type sshTunnel struct { cmd *exec.Cmd } -func createSSHTunnel(t *Tunnel, name, clusterIP string, ports []v1.ServicePort) *sshTunnel { +func createSSHTunnel(name, sshPort, sshKey string, svc v1.Service) *sshTunnel { // extract sshArgs sshArgs := []string{ "-N", "docker@127.0.0.1", - "-p", t.sshPort, - "-i", t.sshKey, + "-p", sshPort, + "-i", sshKey, } - for _, port := range ports { + for _, port := range svc.Spec.Ports { arg := fmt.Sprintf( "-L %d:%s:%d", port.Port, - clusterIP, + svc.Spec.ClusterIP, port.Port, ) @@ -57,7 +57,11 @@ func (s *sshTunnel) run() { } // we are ignoring wait return, because the process will be killed, once the tunnel is not needed. - s.cmd.Wait() + err = s.cmd.Wait() + if err != nil { + // TODO: improve logging + fmt.Println(err) + } } func (s *sshTunnel) stop() { diff --git a/pkg/minikube/tunnel/kic/tunnel.go b/pkg/minikube/tunnel/kic/tunnel.go index 0049cedc4831..60c586d73347 100644 --- a/pkg/minikube/tunnel/kic/tunnel.go +++ b/pkg/minikube/tunnel/kic/tunnel.go @@ -50,7 +50,7 @@ func (t *Tunnel) Start() error { continue } - newSSHTunnel := createSSHTunnel(t, name, s.Spec.ClusterIP, s.Spec.Ports) + newSSHTunnel := createSSHTunnel(name, t.sshPort, t.sshKey, s) sshTunnels[newSSHTunnel.name] = newSSHTunnel } } From 3f0fceb76957edbcf4f1b99ac23a1153c05ed837 Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Wed, 12 Feb 2020 15:26:42 -0300 Subject: [PATCH 07/14] Add patch services --- pkg/minikube/tunnel/kic/tunnel.go | 20 +++++++--- pkg/minikube/tunnel/loadbalancer_patcher.go | 38 +++++++++++++------ .../tunnel/loadbalancer_patcher_test.go | 8 ++-- pkg/minikube/tunnel/tunnel.go | 8 ++-- 4 files changed, 48 insertions(+), 26 deletions(-) diff --git a/pkg/minikube/tunnel/kic/tunnel.go b/pkg/minikube/tunnel/kic/tunnel.go index 60c586d73347..f0a0d3e71ebf 100644 --- a/pkg/minikube/tunnel/kic/tunnel.go +++ b/pkg/minikube/tunnel/kic/tunnel.go @@ -8,21 +8,25 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" typed_core "k8s.io/client-go/kubernetes/typed/core/v1" + + "k8s.io/minikube/pkg/minikube/tunnel" ) // Tunnel ... type Tunnel struct { - sshPort string - sshKey string - v1Core typed_core.CoreV1Interface + sshPort string + sshKey string + v1Core typed_core.CoreV1Interface + LoadBalancerEmulator tunnel.LoadBalancerEmulator } // NewTunnel ... func NewTunnel(sshPort, sshKey string, v1Core typed_core.CoreV1Interface) *Tunnel { return &Tunnel{ - sshPort: sshPort, - sshKey: sshKey, - v1Core: v1Core, + sshPort: sshPort, + sshKey: sshKey, + v1Core: v1Core, + LoadBalancerEmulator: tunnel.NewLoadBalancerEmulator(v1Core), } } @@ -52,6 +56,10 @@ func (t *Tunnel) Start() error { newSSHTunnel := createSSHTunnel(name, t.sshPort, t.sshKey, s) sshTunnels[newSSHTunnel.name] = newSSHTunnel + _, err := t.LoadBalancerEmulator.PatchServicesIP("127.0.0.1") + if err != nil { + fmt.Println(err) + } } } diff --git a/pkg/minikube/tunnel/loadbalancer_patcher.go b/pkg/minikube/tunnel/loadbalancer_patcher.go index 84b3a19ba838..6cfa3f9a254b 100644 --- a/pkg/minikube/tunnel/loadbalancer_patcher.go +++ b/pkg/minikube/tunnel/loadbalancer_patcher.go @@ -37,22 +37,28 @@ type patchConverter interface { convert(restClient rest.Interface, patch *Patch) *rest.Request } -// loadBalancerEmulator is the main struct for emulating the loadbalancer behavior. it sets the ingress to the cluster IP -type loadBalancerEmulator struct { +// LoadBalancerEmulator is the main struct for emulating the loadbalancer behavior. it sets the ingress to the cluster IP +type LoadBalancerEmulator struct { coreV1Client typed_core.CoreV1Interface requestSender requestSender patchConverter patchConverter } -func (l *loadBalancerEmulator) PatchServices() ([]string, error) { +func (l *LoadBalancerEmulator) PatchServices() ([]string, error) { return l.applyOnLBServices(l.updateService) } -func (l *loadBalancerEmulator) Cleanup() ([]string, error) { +func (l *LoadBalancerEmulator) PatchServicesIP(ip string) ([]string, error) { + return l.applyOnLBServices(func(restClient rest.Interface, svc core.Service) ([]byte, error) { + return l.updateServiceIP(restClient, svc, ip) + }) +} + +func (l *LoadBalancerEmulator) Cleanup() ([]string, error) { return l.applyOnLBServices(l.cleanupService) } -func (l *loadBalancerEmulator) applyOnLBServices(action func(restClient rest.Interface, svc core.Service) ([]byte, error)) ([]string, error) { +func (l *LoadBalancerEmulator) applyOnLBServices(action func(restClient rest.Interface, svc core.Service) ([]byte, error)) ([]string, error) { services := l.coreV1Client.Services("") serviceList, err := services.List(meta.ListOptions{}) if err != nil { @@ -79,14 +85,22 @@ func (l *loadBalancerEmulator) applyOnLBServices(action func(restClient rest.Int } return managedServices, nil } -func (l *loadBalancerEmulator) updateService(restClient rest.Interface, svc core.Service) ([]byte, error) { + +func (l *LoadBalancerEmulator) updateService(restClient rest.Interface, svc core.Service) ([]byte, error) { clusterIP := svc.Spec.ClusterIP ingresses := svc.Status.LoadBalancer.Ingress if len(ingresses) == 1 && ingresses[0].IP == clusterIP { return nil, nil } + return l.updateServiceIP(restClient, svc, clusterIP) +} + +func (l *LoadBalancerEmulator) updateServiceIP(restClient rest.Interface, svc core.Service, ip string) ([]byte, error) { + if len(ip) == 0 { + return nil, nil + } glog.V(3).Infof("[%s] setting ClusterIP as the LoadBalancer Ingress", svc.Name) - jsonPatch := fmt.Sprintf(`[{"op": "add", "path": "/status/loadBalancer/ingress", "value": [ { "ip": "%s" } ] }]`, clusterIP) + jsonPatch := fmt.Sprintf(`[{"op": "add", "path": "/status/loadBalancer/ingress", "value": [ { "ip": "%s" } ] }]`, ip) patch := &Patch{ Type: types.JSONPatchType, ResourceName: svc.Name, @@ -99,14 +113,14 @@ func (l *loadBalancerEmulator) updateService(restClient rest.Interface, svc core request := l.patchConverter.convert(restClient, patch) result, err := l.requestSender.send(request) if err != nil { - glog.Errorf("error patching %s with IP %s: %s", svc.Name, clusterIP, err) + glog.Errorf("error patching %s with IP %s: %s", svc.Name, ip, err) } else { - glog.Infof("Patched %s with IP %s", svc.Name, clusterIP) + glog.Infof("Patched %s with IP %s", svc.Name, ip) } return result, err } -func (l *loadBalancerEmulator) cleanupService(restClient rest.Interface, svc core.Service) ([]byte, error) { +func (l *LoadBalancerEmulator) cleanupService(restClient rest.Interface, svc core.Service) ([]byte, error) { ingresses := svc.Status.LoadBalancer.Ingress if len(ingresses) == 0 { return nil, nil @@ -129,8 +143,8 @@ func (l *loadBalancerEmulator) cleanupService(restClient rest.Interface, svc cor } -func newLoadBalancerEmulator(corev1Client typed_core.CoreV1Interface) loadBalancerEmulator { - return loadBalancerEmulator{ +func NewLoadBalancerEmulator(corev1Client typed_core.CoreV1Interface) LoadBalancerEmulator { + return LoadBalancerEmulator{ coreV1Client: corev1Client, requestSender: &defaultRequestSender{}, patchConverter: &defaultPatchConverter{}, diff --git a/pkg/minikube/tunnel/loadbalancer_patcher_test.go b/pkg/minikube/tunnel/loadbalancer_patcher_test.go index 3143f4cb7d0b..1e6788f90d39 100644 --- a/pkg/minikube/tunnel/loadbalancer_patcher_test.go +++ b/pkg/minikube/tunnel/loadbalancer_patcher_test.go @@ -87,7 +87,7 @@ func TestEmptyListOfServicesDoesNothing(t *testing.T) { client := newStubCoreClient(&core.ServiceList{ Items: []core.Service{}}) - patcher := newLoadBalancerEmulator(client) + patcher := NewLoadBalancerEmulator(client) serviceNames, err := patcher.PatchServices() @@ -113,7 +113,7 @@ func TestServicesWithNoLoadbalancerType(t *testing.T) { }, }) - patcher := newLoadBalancerEmulator(client) + patcher := NewLoadBalancerEmulator(client) serviceNames, err := patcher.PatchServices() @@ -214,7 +214,7 @@ func TestServicesWithLoadbalancerType(t *testing.T) { requestSender := &countingRequestSender{} patchConverter := &recordingPatchConverter{} - patcher := newLoadBalancerEmulator(client) + patcher := NewLoadBalancerEmulator(client) patcher.requestSender = requestSender patcher.patchConverter = patchConverter @@ -328,7 +328,7 @@ func TestCleanupPatchedIPs(t *testing.T) { requestSender := &countingRequestSender{} patchConverter := &recordingPatchConverter{} - patcher := newLoadBalancerEmulator(client) + patcher := NewLoadBalancerEmulator(client) patcher.requestSender = requestSender patcher.patchConverter = patchConverter diff --git a/pkg/minikube/tunnel/tunnel.go b/pkg/minikube/tunnel/tunnel.go index ab83bed1d385..375f032057ce 100644 --- a/pkg/minikube/tunnel/tunnel.go +++ b/pkg/minikube/tunnel/tunnel.go @@ -72,7 +72,7 @@ func newTunnel(machineName string, machineAPI libmachine.API, configLoader confi clusterInspector: ci, router: router, registry: registry, - loadBalancerEmulator: newLoadBalancerEmulator(v1Core), + LoadBalancerEmulator: NewLoadBalancerEmulator(v1Core), status: &Status{ TunnelID: id, MinikubeState: state, @@ -88,7 +88,7 @@ type tunnel struct { // collaborators clusterInspector *clusterInspector router router - loadBalancerEmulator loadBalancerEmulator + LoadBalancerEmulator LoadBalancerEmulator reporter reporter registry *persistentRegistry @@ -108,7 +108,7 @@ func (t *tunnel) cleanup() *Status { } } if t.status.MinikubeState == Running { - t.status.PatchedServices, t.status.LoadBalancerEmulatorError = t.loadBalancerEmulator.Cleanup() + t.status.PatchedServices, t.status.LoadBalancerEmulatorError = t.LoadBalancerEmulator.Cleanup() } return t.status } @@ -122,7 +122,7 @@ func (t *tunnel) update() *Status { glog.V(3).Infof("minikube is running, trying to add route%s", t.status.TunnelID.Route) setupRoute(t, h) if t.status.RouteError == nil { - t.status.PatchedServices, t.status.LoadBalancerEmulatorError = t.loadBalancerEmulator.PatchServices() + t.status.PatchedServices, t.status.LoadBalancerEmulatorError = t.LoadBalancerEmulator.PatchServices() } } glog.V(3).Infof("sending report %s", t.status) From 073558e21eac6d82fd04c22848a4abd7432a8df2 Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Fri, 14 Feb 2020 12:43:26 -0300 Subject: [PATCH 08/14] Add cleanup to kic tunnel --- pkg/minikube/tunnel/kic/tunnel.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/minikube/tunnel/kic/tunnel.go b/pkg/minikube/tunnel/kic/tunnel.go index f0a0d3e71ebf..195f50b82452 100644 --- a/pkg/minikube/tunnel/kic/tunnel.go +++ b/pkg/minikube/tunnel/kic/tunnel.go @@ -1,6 +1,7 @@ package kic import ( + "context" "fmt" "strings" "time" @@ -14,6 +15,7 @@ import ( // Tunnel ... type Tunnel struct { + ctx context.Context sshPort string sshKey string v1Core typed_core.CoreV1Interface @@ -21,8 +23,9 @@ type Tunnel struct { } // NewTunnel ... -func NewTunnel(sshPort, sshKey string, v1Core typed_core.CoreV1Interface) *Tunnel { +func NewTunnel(ctx context.Context, sshPort, sshKey string, v1Core typed_core.CoreV1Interface) *Tunnel { return &Tunnel{ + ctx: ctx, sshPort: sshPort, sshKey: sshKey, v1Core: v1Core, @@ -35,6 +38,16 @@ func (t *Tunnel) Start() error { sshTunnels := make(map[string]*sshTunnel) for { + select { + case <-t.ctx.Done(): + _, err := t.LoadBalancerEmulator.Cleanup() + if err != nil { + fmt.Println(err) + } + return nil + default: + } + services, err := t.v1Core.Services("").List(metav1.ListOptions{}) if err != nil { return err @@ -56,6 +69,8 @@ func (t *Tunnel) Start() error { newSSHTunnel := createSSHTunnel(name, t.sshPort, t.sshKey, s) sshTunnels[newSSHTunnel.name] = newSSHTunnel + + // PatchServicesIP maybe can receive the svc here, instead of listing inside again _, err := t.LoadBalancerEmulator.PatchServicesIP("127.0.0.1") if err != nil { fmt.Println(err) From c421e156b6854ec032ac64546772cb0e2bfcdf57 Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Fri, 14 Feb 2020 12:51:04 -0300 Subject: [PATCH 09/14] Fix tunnel cmd to pass context to kic tunnel --- cmd/minikube/cmd/tunnel.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/minikube/cmd/tunnel.go b/cmd/minikube/cmd/tunnel.go index 7639bdfacb19..f1b23fcecfea 100644 --- a/cmd/minikube/cmd/tunnel.go +++ b/cmd/minikube/cmd/tunnel.go @@ -81,26 +81,26 @@ var tunnelCmd = &cobra.Command{ exit.WithError("Error getting config", err) } + ctrlC := make(chan os.Signal, 1) + signal.Notify(ctrlC, os.Interrupt) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-ctrlC + cancel() + }() + if runtime.GOOS == "darwin" && cfg.VMDriver == "docker" { // do not ignore error port, _ := oci.HostPortBinding("docker", "minikube", 22) sshPort := strconv.Itoa(port) sshKey := filepath.Join(localpath.MiniPath(), "machines", cfg.Name, "id_rsa") - kicTunnel := kic.NewTunnel(sshPort, sshKey, clientset.CoreV1()) + kicTunnel := kic.NewTunnel(ctx, sshPort, sshKey, clientset.CoreV1()) kicTunnel.Start() return } - ctrlC := make(chan os.Signal, 1) - signal.Notify(ctrlC, os.Interrupt) - ctx, cancel := context.WithCancel(context.Background()) - go func() { - <-ctrlC - cancel() - }() - done, err := manager.StartTunnel(ctx, cfg.Name, api, config.DefaultLoader, clientset.CoreV1()) if err != nil { exit.WithError("error starting tunnel", err) From 08b0b654a0845dd05a8f922e68d8cab2b9f21f9e Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Fri, 14 Feb 2020 13:01:43 -0300 Subject: [PATCH 10/14] Refactor PatchServicesIP --- pkg/minikube/tunnel/kic/tunnel.go | 5 +++-- pkg/minikube/tunnel/loadbalancer_patcher.go | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/minikube/tunnel/kic/tunnel.go b/pkg/minikube/tunnel/kic/tunnel.go index 195f50b82452..4de528f77327 100644 --- a/pkg/minikube/tunnel/kic/tunnel.go +++ b/pkg/minikube/tunnel/kic/tunnel.go @@ -37,6 +37,8 @@ func NewTunnel(ctx context.Context, sshPort, sshKey string, v1Core typed_core.Co func (t *Tunnel) Start() error { sshTunnels := make(map[string]*sshTunnel) + restClient := t.v1Core.RESTClient() + for { select { case <-t.ctx.Done(): @@ -70,8 +72,7 @@ func (t *Tunnel) Start() error { newSSHTunnel := createSSHTunnel(name, t.sshPort, t.sshKey, s) sshTunnels[newSSHTunnel.name] = newSSHTunnel - // PatchServicesIP maybe can receive the svc here, instead of listing inside again - _, err := t.LoadBalancerEmulator.PatchServicesIP("127.0.0.1") + err := t.LoadBalancerEmulator.PatchServiceIP(restClient, s, "127.0.0.1") if err != nil { fmt.Println(err) } diff --git a/pkg/minikube/tunnel/loadbalancer_patcher.go b/pkg/minikube/tunnel/loadbalancer_patcher.go index 6cfa3f9a254b..e328e132f93c 100644 --- a/pkg/minikube/tunnel/loadbalancer_patcher.go +++ b/pkg/minikube/tunnel/loadbalancer_patcher.go @@ -48,10 +48,10 @@ func (l *LoadBalancerEmulator) PatchServices() ([]string, error) { return l.applyOnLBServices(l.updateService) } -func (l *LoadBalancerEmulator) PatchServicesIP(ip string) ([]string, error) { - return l.applyOnLBServices(func(restClient rest.Interface, svc core.Service) ([]byte, error) { - return l.updateServiceIP(restClient, svc, ip) - }) +func (l *LoadBalancerEmulator) PatchServiceIP(restClient rest.Interface, svc core.Service, ip string) error { + // TODO: do not ignore result + _, err := l.updateServiceIP(restClient, svc, ip) + return err } func (l *LoadBalancerEmulator) Cleanup() ([]string, error) { From 0d11e4e3480969259582d015ea418728cbf6c236 Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Fri, 14 Feb 2020 13:22:56 -0300 Subject: [PATCH 11/14] Add error checking to kic cmd/tunnel --- cmd/minikube/cmd/tunnel.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cmd/minikube/cmd/tunnel.go b/cmd/minikube/cmd/tunnel.go index f1b23fcecfea..b089fb031a65 100644 --- a/cmd/minikube/cmd/tunnel.go +++ b/cmd/minikube/cmd/tunnel.go @@ -90,13 +90,18 @@ var tunnelCmd = &cobra.Command{ }() if runtime.GOOS == "darwin" && cfg.VMDriver == "docker" { - // do not ignore error - port, _ := oci.HostPortBinding("docker", "minikube", 22) + port, err := oci.HostPortBinding("docker", "minikube", 22) + if err != nil { + exit.WithError("error getting ssh port", err) + } sshPort := strconv.Itoa(port) sshKey := filepath.Join(localpath.MiniPath(), "machines", cfg.Name, "id_rsa") kicTunnel := kic.NewTunnel(ctx, sshPort, sshKey, clientset.CoreV1()) - kicTunnel.Start() + err = kicTunnel.Start() + if err != nil { + exit.WithError("error starting tunnel", err) + } return } From b0d19e51b5831b0ddb7f06169c21c77f0ccc86cd Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Fri, 14 Feb 2020 21:01:22 -0300 Subject: [PATCH 12/14] Refactor kic ssh tunnel --- cmd/minikube/cmd/tunnel.go | 6 +- .../tunnel/kic/{sshTunnel.go => ssh_conn.go} | 33 +++-- pkg/minikube/tunnel/kic/ssh_tunnel.go | 123 ++++++++++++++++++ pkg/minikube/tunnel/kic/tunnel.go | 109 ---------------- 4 files changed, 142 insertions(+), 129 deletions(-) rename pkg/minikube/tunnel/kic/{sshTunnel.go => ssh_conn.go} (58%) create mode 100644 pkg/minikube/tunnel/kic/ssh_tunnel.go delete mode 100644 pkg/minikube/tunnel/kic/tunnel.go diff --git a/cmd/minikube/cmd/tunnel.go b/cmd/minikube/cmd/tunnel.go index b089fb031a65..35baf5eb7461 100644 --- a/cmd/minikube/cmd/tunnel.go +++ b/cmd/minikube/cmd/tunnel.go @@ -21,7 +21,7 @@ import ( "os" "os/signal" "path/filepath" - "runtime" + //"runtime" "strconv" "time" @@ -97,8 +97,8 @@ var tunnelCmd = &cobra.Command{ sshPort := strconv.Itoa(port) sshKey := filepath.Join(localpath.MiniPath(), "machines", cfg.Name, "id_rsa") - kicTunnel := kic.NewTunnel(ctx, sshPort, sshKey, clientset.CoreV1()) - err = kicTunnel.Start() + kicSSHTunnel := kic.NewSSHTunnel(ctx, sshPort, sshKey, clientset.CoreV1()) + err = kicSSHTunnel.Start() if err != nil { exit.WithError("error starting tunnel", err) } diff --git a/pkg/minikube/tunnel/kic/sshTunnel.go b/pkg/minikube/tunnel/kic/ssh_conn.go similarity index 58% rename from pkg/minikube/tunnel/kic/sshTunnel.go rename to pkg/minikube/tunnel/kic/ssh_conn.go index 02e51822ca3e..3fe34b411c9e 100644 --- a/pkg/minikube/tunnel/kic/sshTunnel.go +++ b/pkg/minikube/tunnel/kic/ssh_conn.go @@ -7,13 +7,13 @@ import ( v1 "k8s.io/api/core/v1" ) -type sshTunnel struct { +type sshConn struct { name string sigkill bool cmd *exec.Cmd } -func createSSHTunnel(name, sshPort, sshKey string, svc v1.Service) *sshTunnel { +func createSSHConn(name, sshPort, sshKey string, svc v1.Service) *sshConn { // extract sshArgs sshArgs := []string{ "-N", @@ -37,38 +37,37 @@ func createSSHTunnel(name, sshPort, sshKey string, svc v1.Service) *sshTunnel { // TODO: name must be different, because if a service was changed, // we must remove the old process and create the new one - s := &sshTunnel{ + s := &sshConn{ name: name, sigkill: false, cmd: cmd, } + // TODO: create should not run go s.run() return s } -func (s *sshTunnel) run() { - fmt.Println("running", s.name) - err := s.cmd.Start() +func (c *sshConn) run() error { + fmt.Println("running", c.name) + err := c.cmd.Start() if err != nil { - // TODO: improve logging - fmt.Println(err) + return err } - // we are ignoring wait return, because the process will be killed, once the tunnel is not needed. - err = s.cmd.Wait() + // TODO: can we ignore kills and returns not kills? + // kilss we have to log nonetheless + err = c.cmd.Wait() if err != nil { // TODO: improve logging fmt.Println(err) } + + return nil } -func (s *sshTunnel) stop() { - fmt.Println("stopping", s.name) - err := s.cmd.Process.Kill() - if err != nil { - // TODO: improve logging - fmt.Println(err) - } +func (c *sshConn) stop() error { + fmt.Println("stopping", c.name) + return c.cmd.Process.Kill() } diff --git a/pkg/minikube/tunnel/kic/ssh_tunnel.go b/pkg/minikube/tunnel/kic/ssh_tunnel.go new file mode 100644 index 000000000000..ffe61d2ff6ec --- /dev/null +++ b/pkg/minikube/tunnel/kic/ssh_tunnel.go @@ -0,0 +1,123 @@ +package kic + +import ( + "context" + "fmt" + "strings" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + typed_core "k8s.io/client-go/kubernetes/typed/core/v1" + + "k8s.io/minikube/pkg/minikube/tunnel" +) + +// SSHTunnel ... +type SSHTunnel struct { + ctx context.Context + sshPort string + sshKey string + v1Core typed_core.CoreV1Interface + LoadBalancerEmulator tunnel.LoadBalancerEmulator + conns map[string]*sshConn + connsToStop map[string]*sshConn +} + +// NewSSHTunnel ... +func NewSSHTunnel(ctx context.Context, sshPort, sshKey string, v1Core typed_core.CoreV1Interface) *SSHTunnel { + return &SSHTunnel{ + ctx: ctx, + sshPort: sshPort, + sshKey: sshKey, + v1Core: v1Core, + LoadBalancerEmulator: tunnel.NewLoadBalancerEmulator(v1Core), + conns: make(map[string]*sshConn), + connsToStop: make(map[string]*sshConn), + } +} + +// Start ... +func (t *SSHTunnel) Start() error { + for { + select { + case <-t.ctx.Done(): + // TODO: extrac to a func + _, err := t.LoadBalancerEmulator.Cleanup() + if err != nil { + fmt.Println(err) + } + return nil + default: + } + + services, err := t.v1Core.Services("").List(metav1.ListOptions{}) + if err != nil { + return err + } + + t.markConnectionsToBeStopped() + + for _, svc := range services.Items { + if svc.Spec.Type == v1.ServiceTypeLoadBalancer { + t.startConnection(svc) + } + } + + t.stopMarkedConnections() + + // TODO: which time to use? + time.Sleep(1 * time.Second) + } +} + +func (t *SSHTunnel) markConnectionsToBeStopped() { + for _, conn := range t.conns { + t.connsToStop[conn.name] = conn + } +} + +func (t *SSHTunnel) startConnection(svc v1.Service) error { + uniqName := sshConnUniqName(svc) + existingSSHConn, ok := t.conns[uniqName] + + if ok { + // if the svc still exist we remove the conn from the stopping list + delete(t.connsToStop, existingSSHConn.name) + return nil + } + + // create new ssh conn + newSSHConn := createSSHConn(uniqName, t.sshPort, t.sshKey, svc) + t.conns[newSSHConn.name] = newSSHConn + + return t.LoadBalancerEmulator.PatchServiceIP(t.v1Core.RESTClient(), svc, "127.0.0.1") +} + +func (t *SSHTunnel) stopMarkedConnections() { + for _, sshConn := range t.connsToStop { + err := sshConn.stop() + if err != nil { + // do something + } + delete(t.conns, sshConn.name) + delete(t.connsToStop, sshConn.name) + } +} + +// sshConnName creates a uniq name for the tunnel, using its name/clusterIP/ports. +// This allows a new process to be created if an existing service was changed, +// the new process will support the IP/Ports change ocurred. +func sshConnUniqName(service v1.Service) string { + n := []string{ + service.Name, + "-", + service.Spec.ClusterIP, + } + + for _, port := range service.Spec.Ports { + n = append(n, fmt.Sprintf("-%d", port.Port)) + } + + return strings.Join(n, "") +} diff --git a/pkg/minikube/tunnel/kic/tunnel.go b/pkg/minikube/tunnel/kic/tunnel.go deleted file mode 100644 index 4de528f77327..000000000000 --- a/pkg/minikube/tunnel/kic/tunnel.go +++ /dev/null @@ -1,109 +0,0 @@ -package kic - -import ( - "context" - "fmt" - "strings" - "time" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - typed_core "k8s.io/client-go/kubernetes/typed/core/v1" - - "k8s.io/minikube/pkg/minikube/tunnel" -) - -// Tunnel ... -type Tunnel struct { - ctx context.Context - sshPort string - sshKey string - v1Core typed_core.CoreV1Interface - LoadBalancerEmulator tunnel.LoadBalancerEmulator -} - -// NewTunnel ... -func NewTunnel(ctx context.Context, sshPort, sshKey string, v1Core typed_core.CoreV1Interface) *Tunnel { - return &Tunnel{ - ctx: ctx, - sshPort: sshPort, - sshKey: sshKey, - v1Core: v1Core, - LoadBalancerEmulator: tunnel.NewLoadBalancerEmulator(v1Core), - } -} - -// Start ... -func (t *Tunnel) Start() error { - sshTunnels := make(map[string]*sshTunnel) - - restClient := t.v1Core.RESTClient() - - for { - select { - case <-t.ctx.Done(): - _, err := t.LoadBalancerEmulator.Cleanup() - if err != nil { - fmt.Println(err) - } - return nil - default: - } - - services, err := t.v1Core.Services("").List(metav1.ListOptions{}) - if err != nil { - return err - } - - for _, sshTunnel := range sshTunnels { - sshTunnel.sigkill = true - } - - for _, s := range services.Items { - if s.Spec.Type == v1.ServiceTypeLoadBalancer { - name := sshTunnelName(s) - existingSSHTunnel, ok := sshTunnels[name] - - if ok { - existingSSHTunnel.sigkill = false - continue - } - - newSSHTunnel := createSSHTunnel(name, t.sshPort, t.sshKey, s) - sshTunnels[newSSHTunnel.name] = newSSHTunnel - - err := t.LoadBalancerEmulator.PatchServiceIP(restClient, s, "127.0.0.1") - if err != nil { - fmt.Println(err) - } - } - } - - for _, sshTunnel := range sshTunnels { - if sshTunnel.sigkill { - sshTunnel.stop() - delete(sshTunnels, sshTunnel.name) - } - } - - // TODO: which time to use? - time.Sleep(1 * time.Second) - } -} - -// sshTunnelName creaets a uniq name for the tunnel, using its name/clusterIP/ports. -// This allows a new process to be created if an existing service was changed, -// the new process will support the IP/Ports change ocurred. -func sshTunnelName(service v1.Service) string { - n := []string{ - service.Name, - "-", - service.Spec.ClusterIP, - } - - for _, port := range service.Spec.Ports { - n = append(n, fmt.Sprintf("-%d", port.Port)) - } - - return strings.Join(n, "") -} From 2bfcb72b9678ef9564a347db171b7e93fda56710 Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Mon, 17 Feb 2020 10:22:11 -0300 Subject: [PATCH 13/14] Remove not used code --- cmd/minikube/cmd/tunnel.go | 2 +- pkg/minikube/tunnel/kic/ssh_conn.go | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/cmd/minikube/cmd/tunnel.go b/cmd/minikube/cmd/tunnel.go index 35baf5eb7461..e1de7e8e1226 100644 --- a/cmd/minikube/cmd/tunnel.go +++ b/cmd/minikube/cmd/tunnel.go @@ -21,7 +21,7 @@ import ( "os" "os/signal" "path/filepath" - //"runtime" + "runtime" "strconv" "time" diff --git a/pkg/minikube/tunnel/kic/ssh_conn.go b/pkg/minikube/tunnel/kic/ssh_conn.go index 3fe34b411c9e..ce120d889cbd 100644 --- a/pkg/minikube/tunnel/kic/ssh_conn.go +++ b/pkg/minikube/tunnel/kic/ssh_conn.go @@ -8,9 +8,8 @@ import ( ) type sshConn struct { - name string - sigkill bool - cmd *exec.Cmd + name string + cmd *exec.Cmd } func createSSHConn(name, sshPort, sshKey string, svc v1.Service) *sshConn { @@ -38,9 +37,8 @@ func createSSHConn(name, sshPort, sshKey string, svc v1.Service) *sshConn { // TODO: name must be different, because if a service was changed, // we must remove the old process and create the new one s := &sshConn{ - name: name, - sigkill: false, - cmd: cmd, + name: name, + cmd: cmd, } // TODO: create should not run From b201d4b5c9b2a258dc3fec36beff4e6d6bb3bb13 Mon Sep 17 00:00:00 2001 From: Jose Donizetti Date: Tue, 18 Feb 2020 13:29:56 -0300 Subject: [PATCH 14/14] Add glog logging --- pkg/minikube/tunnel/kic/ssh_conn.go | 28 ++++----------------------- pkg/minikube/tunnel/kic/ssh_tunnel.go | 27 ++++++++++++++++++-------- 2 files changed, 23 insertions(+), 32 deletions(-) diff --git a/pkg/minikube/tunnel/kic/ssh_conn.go b/pkg/minikube/tunnel/kic/ssh_conn.go index ce120d889cbd..c8a85c197457 100644 --- a/pkg/minikube/tunnel/kic/ssh_conn.go +++ b/pkg/minikube/tunnel/kic/ssh_conn.go @@ -34,38 +34,18 @@ func createSSHConn(name, sshPort, sshKey string, svc v1.Service) *sshConn { cmd := exec.Command("ssh", sshArgs...) - // TODO: name must be different, because if a service was changed, - // we must remove the old process and create the new one - s := &sshConn{ + return &sshConn{ name: name, cmd: cmd, } - - // TODO: create should not run - go s.run() - - return s } func (c *sshConn) run() error { - fmt.Println("running", c.name) - err := c.cmd.Start() - if err != nil { - return err - } - - // TODO: can we ignore kills and returns not kills? - // kilss we have to log nonetheless - err = c.cmd.Wait() - if err != nil { - // TODO: improve logging - fmt.Println(err) - } - - return nil + fmt.Printf("starting tunnel for %s\n", c.name) + return c.cmd.Run() } func (c *sshConn) stop() error { - fmt.Println("stopping", c.name) + fmt.Printf("stopping tunnel for %s\n", c.name) return c.cmd.Process.Kill() } diff --git a/pkg/minikube/tunnel/kic/ssh_tunnel.go b/pkg/minikube/tunnel/kic/ssh_tunnel.go index ffe61d2ff6ec..eba12ea34b50 100644 --- a/pkg/minikube/tunnel/kic/ssh_tunnel.go +++ b/pkg/minikube/tunnel/kic/ssh_tunnel.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/golang/glog" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" typed_core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -42,18 +43,17 @@ func (t *SSHTunnel) Start() error { for { select { case <-t.ctx.Done(): - // TODO: extrac to a func _, err := t.LoadBalancerEmulator.Cleanup() if err != nil { - fmt.Println(err) + glog.Errorf("error cleaning up: %v", err) } - return nil + return err default: } services, err := t.v1Core.Services("").List(metav1.ListOptions{}) if err != nil { - return err + glog.Errorf("error listing services: %v", err) } t.markConnectionsToBeStopped() @@ -77,28 +77,39 @@ func (t *SSHTunnel) markConnectionsToBeStopped() { } } -func (t *SSHTunnel) startConnection(svc v1.Service) error { +func (t *SSHTunnel) startConnection(svc v1.Service) { uniqName := sshConnUniqName(svc) existingSSHConn, ok := t.conns[uniqName] if ok { // if the svc still exist we remove the conn from the stopping list delete(t.connsToStop, existingSSHConn.name) - return nil + return } // create new ssh conn newSSHConn := createSSHConn(uniqName, t.sshPort, t.sshKey, svc) t.conns[newSSHConn.name] = newSSHConn - return t.LoadBalancerEmulator.PatchServiceIP(t.v1Core.RESTClient(), svc, "127.0.0.1") + go func() { + err := newSSHConn.run() + if err != nil { + glog.Errorf("error starting ssh tunnel: %v", err) + } + + }() + + err := t.LoadBalancerEmulator.PatchServiceIP(t.v1Core.RESTClient(), svc, "127.0.0.1") + if err != nil { + glog.Errorf("error patching service: %v", err) + } } func (t *SSHTunnel) stopMarkedConnections() { for _, sshConn := range t.connsToStop { err := sshConn.stop() if err != nil { - // do something + glog.Errorf("error stopping ssh tunnel: %v", err) } delete(t.conns, sshConn.name) delete(t.connsToStop, sshConn.name)