Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tunnel for docker driver on darwin #6460

Merged
merged 14 commits into from
Feb 24, 2020
31 changes: 28 additions & 3 deletions cmd/minikube/cmd/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@ import (
"context"
"os"
"os/signal"
"path/filepath"
"runtime"
"strconv"
"time"

"github.com/golang/glog"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"k8s.io/minikube/pkg/drivers/kic/oci"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/exit"
"k8s.io/minikube/pkg/minikube/localpath"
"k8s.io/minikube/pkg/minikube/machine"
"k8s.io/minikube/pkg/minikube/service"
"k8s.io/minikube/pkg/minikube/tunnel"
"k8s.io/minikube/pkg/minikube/tunnel/kic"
)

var cleanup bool
Expand Down Expand Up @@ -69,6 +76,11 @@ var tunnelCmd = &cobra.Command{
exit.WithError("error creating clientset", err)
}

cfg, err := config.Load(viper.GetString(config.MachineProfile))
if err != nil {
exit.WithError("Error getting config", err)
}

ctrlC := make(chan os.Signal, 1)
signal.Notify(ctrlC, os.Interrupt)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -77,10 +89,23 @@ var tunnelCmd = &cobra.Command{
cancel()
}()

cfg, err := config.Load(viper.GetString(config.MachineProfile))
if err != nil {
exit.WithError("Error getting config", err)
if runtime.GOOS == "darwin" && cfg.VMDriver == "docker" {
port, err := oci.HostPortBinding("docker", "minikube", 22)
if err != nil {
exit.WithError("error getting ssh port", err)
}
sshPort := strconv.Itoa(port)
sshKey := filepath.Join(localpath.MiniPath(), "machines", cfg.Name, "id_rsa")

kicSSHTunnel := kic.NewSSHTunnel(ctx, sshPort, sshKey, clientset.CoreV1())
err = kicSSHTunnel.Start()
if err != nil {
exit.WithError("error starting tunnel", err)
}

return
}

done, err := manager.StartTunnel(ctx, cfg.Name, api, config.DefaultLoader, clientset.CoreV1())
if err != nil {
exit.WithError("error starting tunnel", err)
Expand Down
51 changes: 51 additions & 0 deletions pkg/minikube/tunnel/kic/ssh_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package kic

import (
"fmt"
"os/exec"

v1 "k8s.io/api/core/v1"
)

type sshConn struct {
name string
cmd *exec.Cmd
}

func createSSHConn(name, sshPort, sshKey string, svc v1.Service) *sshConn {
// extract sshArgs
sshArgs := []string{
"-N",
"[email protected]",
"-p", sshPort,
"-i", sshKey,
}

for _, port := range svc.Spec.Ports {
arg := fmt.Sprintf(
"-L %d:%s:%d",
port.Port,
svc.Spec.ClusterIP,
port.Port,
)

sshArgs = append(sshArgs, arg)
}

cmd := exec.Command("ssh", sshArgs...)

return &sshConn{
name: name,
cmd: cmd,
}
}

func (c *sshConn) run() error {
fmt.Printf("starting tunnel for %s\n", c.name)
return c.cmd.Run()
}

func (c *sshConn) stop() error {
fmt.Printf("stopping tunnel for %s\n", c.name)
return c.cmd.Process.Kill()
}
134 changes: 134 additions & 0 deletions pkg/minikube/tunnel/kic/ssh_tunnel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package kic

import (
"context"
"fmt"
"strings"
"time"

"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
typed_core "k8s.io/client-go/kubernetes/typed/core/v1"

"k8s.io/minikube/pkg/minikube/tunnel"
)

// SSHTunnel ...
type SSHTunnel struct {
ctx context.Context
sshPort string
sshKey string
v1Core typed_core.CoreV1Interface
LoadBalancerEmulator tunnel.LoadBalancerEmulator
conns map[string]*sshConn
connsToStop map[string]*sshConn
}

// NewSSHTunnel ...
func NewSSHTunnel(ctx context.Context, sshPort, sshKey string, v1Core typed_core.CoreV1Interface) *SSHTunnel {
return &SSHTunnel{
ctx: ctx,
sshPort: sshPort,
sshKey: sshKey,
v1Core: v1Core,
LoadBalancerEmulator: tunnel.NewLoadBalancerEmulator(v1Core),
conns: make(map[string]*sshConn),
connsToStop: make(map[string]*sshConn),
}
}

// Start ...
func (t *SSHTunnel) Start() error {
for {
select {
case <-t.ctx.Done():
_, err := t.LoadBalancerEmulator.Cleanup()
if err != nil {
glog.Errorf("error cleaning up: %v", err)
}
return err
default:
}

services, err := t.v1Core.Services("").List(metav1.ListOptions{})
if err != nil {
glog.Errorf("error listing services: %v", err)
}

t.markConnectionsToBeStopped()

for _, svc := range services.Items {
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
t.startConnection(svc)
}
}

t.stopMarkedConnections()

// TODO: which time to use?
time.Sleep(1 * time.Second)
}
}

func (t *SSHTunnel) markConnectionsToBeStopped() {
for _, conn := range t.conns {
t.connsToStop[conn.name] = conn
}
}

func (t *SSHTunnel) startConnection(svc v1.Service) {
uniqName := sshConnUniqName(svc)
existingSSHConn, ok := t.conns[uniqName]

if ok {
// if the svc still exist we remove the conn from the stopping list
delete(t.connsToStop, existingSSHConn.name)
return
}

// create new ssh conn
newSSHConn := createSSHConn(uniqName, t.sshPort, t.sshKey, svc)
t.conns[newSSHConn.name] = newSSHConn

go func() {
err := newSSHConn.run()
if err != nil {
glog.Errorf("error starting ssh tunnel: %v", err)
}

}()

err := t.LoadBalancerEmulator.PatchServiceIP(t.v1Core.RESTClient(), svc, "127.0.0.1")
if err != nil {
glog.Errorf("error patching service: %v", err)
}
}

func (t *SSHTunnel) stopMarkedConnections() {
for _, sshConn := range t.connsToStop {
err := sshConn.stop()
if err != nil {
glog.Errorf("error stopping ssh tunnel: %v", err)
}
delete(t.conns, sshConn.name)
delete(t.connsToStop, sshConn.name)
}
}

// sshConnName creates a uniq name for the tunnel, using its name/clusterIP/ports.
// This allows a new process to be created if an existing service was changed,
// the new process will support the IP/Ports change ocurred.
func sshConnUniqName(service v1.Service) string {
n := []string{
service.Name,
"-",
service.Spec.ClusterIP,
}

for _, port := range service.Spec.Ports {
n = append(n, fmt.Sprintf("-%d", port.Port))
}

return strings.Join(n, "")
}
38 changes: 26 additions & 12 deletions pkg/minikube/tunnel/loadbalancer_patcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,28 @@ type patchConverter interface {
convert(restClient rest.Interface, patch *Patch) *rest.Request
}

// loadBalancerEmulator is the main struct for emulating the loadbalancer behavior. it sets the ingress to the cluster IP
type loadBalancerEmulator struct {
// LoadBalancerEmulator is the main struct for emulating the loadbalancer behavior. it sets the ingress to the cluster IP
type LoadBalancerEmulator struct {
coreV1Client typed_core.CoreV1Interface
requestSender requestSender
patchConverter patchConverter
}

func (l *loadBalancerEmulator) PatchServices() ([]string, error) {
func (l *LoadBalancerEmulator) PatchServices() ([]string, error) {
return l.applyOnLBServices(l.updateService)
}

func (l *loadBalancerEmulator) Cleanup() ([]string, error) {
func (l *LoadBalancerEmulator) PatchServiceIP(restClient rest.Interface, svc core.Service, ip string) error {
// TODO: do not ignore result
_, err := l.updateServiceIP(restClient, svc, ip)
return err
}

func (l *LoadBalancerEmulator) Cleanup() ([]string, error) {
return l.applyOnLBServices(l.cleanupService)
}

func (l *loadBalancerEmulator) applyOnLBServices(action func(restClient rest.Interface, svc core.Service) ([]byte, error)) ([]string, error) {
func (l *LoadBalancerEmulator) applyOnLBServices(action func(restClient rest.Interface, svc core.Service) ([]byte, error)) ([]string, error) {
services := l.coreV1Client.Services("")
serviceList, err := services.List(meta.ListOptions{})
if err != nil {
Expand All @@ -79,14 +85,22 @@ func (l *loadBalancerEmulator) applyOnLBServices(action func(restClient rest.Int
}
return managedServices, nil
}
func (l *loadBalancerEmulator) updateService(restClient rest.Interface, svc core.Service) ([]byte, error) {

func (l *LoadBalancerEmulator) updateService(restClient rest.Interface, svc core.Service) ([]byte, error) {
clusterIP := svc.Spec.ClusterIP
ingresses := svc.Status.LoadBalancer.Ingress
if len(ingresses) == 1 && ingresses[0].IP == clusterIP {
return nil, nil
}
return l.updateServiceIP(restClient, svc, clusterIP)
}

func (l *LoadBalancerEmulator) updateServiceIP(restClient rest.Interface, svc core.Service, ip string) ([]byte, error) {
if len(ip) == 0 {
return nil, nil
}
glog.V(3).Infof("[%s] setting ClusterIP as the LoadBalancer Ingress", svc.Name)
jsonPatch := fmt.Sprintf(`[{"op": "add", "path": "/status/loadBalancer/ingress", "value": [ { "ip": "%s" } ] }]`, clusterIP)
jsonPatch := fmt.Sprintf(`[{"op": "add", "path": "/status/loadBalancer/ingress", "value": [ { "ip": "%s" } ] }]`, ip)
patch := &Patch{
Type: types.JSONPatchType,
ResourceName: svc.Name,
Expand All @@ -99,14 +113,14 @@ func (l *loadBalancerEmulator) updateService(restClient rest.Interface, svc core
request := l.patchConverter.convert(restClient, patch)
result, err := l.requestSender.send(request)
if err != nil {
glog.Errorf("error patching %s with IP %s: %s", svc.Name, clusterIP, err)
glog.Errorf("error patching %s with IP %s: %s", svc.Name, ip, err)
} else {
glog.Infof("Patched %s with IP %s", svc.Name, clusterIP)
glog.Infof("Patched %s with IP %s", svc.Name, ip)
}
return result, err
}

func (l *loadBalancerEmulator) cleanupService(restClient rest.Interface, svc core.Service) ([]byte, error) {
func (l *LoadBalancerEmulator) cleanupService(restClient rest.Interface, svc core.Service) ([]byte, error) {
ingresses := svc.Status.LoadBalancer.Ingress
if len(ingresses) == 0 {
return nil, nil
Expand All @@ -129,8 +143,8 @@ func (l *loadBalancerEmulator) cleanupService(restClient rest.Interface, svc cor

}

func newLoadBalancerEmulator(corev1Client typed_core.CoreV1Interface) loadBalancerEmulator {
return loadBalancerEmulator{
func NewLoadBalancerEmulator(corev1Client typed_core.CoreV1Interface) LoadBalancerEmulator {
return LoadBalancerEmulator{
coreV1Client: corev1Client,
requestSender: &defaultRequestSender{},
patchConverter: &defaultPatchConverter{},
Expand Down
8 changes: 4 additions & 4 deletions pkg/minikube/tunnel/loadbalancer_patcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestEmptyListOfServicesDoesNothing(t *testing.T) {
client := newStubCoreClient(&core.ServiceList{
Items: []core.Service{}})

patcher := newLoadBalancerEmulator(client)
patcher := NewLoadBalancerEmulator(client)

serviceNames, err := patcher.PatchServices()

Expand All @@ -113,7 +113,7 @@ func TestServicesWithNoLoadbalancerType(t *testing.T) {
},
})

patcher := newLoadBalancerEmulator(client)
patcher := NewLoadBalancerEmulator(client)

serviceNames, err := patcher.PatchServices()

Expand Down Expand Up @@ -214,7 +214,7 @@ func TestServicesWithLoadbalancerType(t *testing.T) {
requestSender := &countingRequestSender{}
patchConverter := &recordingPatchConverter{}

patcher := newLoadBalancerEmulator(client)
patcher := NewLoadBalancerEmulator(client)
patcher.requestSender = requestSender
patcher.patchConverter = patchConverter

Expand Down Expand Up @@ -328,7 +328,7 @@ func TestCleanupPatchedIPs(t *testing.T) {
requestSender := &countingRequestSender{}
patchConverter := &recordingPatchConverter{}

patcher := newLoadBalancerEmulator(client)
patcher := NewLoadBalancerEmulator(client)
patcher.requestSender = requestSender
patcher.patchConverter = patchConverter

Expand Down
Loading