Skip to content

Commit

Permalink
Merge pull request #6460 from josedonizetti/kic-darwin-tunnel
Browse files Browse the repository at this point in the history
WIP: kic darwin tunnel
  • Loading branch information
medyagh authored Feb 24, 2020
2 parents 5ee57d4 + b201d4b commit 5fe81c8
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 23 deletions.
31 changes: 28 additions & 3 deletions cmd/minikube/cmd/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@ import (
"context"
"os"
"os/signal"
"path/filepath"
"runtime"
"strconv"
"time"

"github.com/golang/glog"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"k8s.io/minikube/pkg/drivers/kic/oci"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/exit"
"k8s.io/minikube/pkg/minikube/localpath"
"k8s.io/minikube/pkg/minikube/machine"
"k8s.io/minikube/pkg/minikube/service"
"k8s.io/minikube/pkg/minikube/tunnel"
"k8s.io/minikube/pkg/minikube/tunnel/kic"
)

var cleanup bool
Expand Down Expand Up @@ -69,6 +76,11 @@ var tunnelCmd = &cobra.Command{
exit.WithError("error creating clientset", err)
}

cfg, err := config.Load(viper.GetString(config.MachineProfile))
if err != nil {
exit.WithError("Error getting config", err)
}

ctrlC := make(chan os.Signal, 1)
signal.Notify(ctrlC, os.Interrupt)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -77,10 +89,23 @@ var tunnelCmd = &cobra.Command{
cancel()
}()

cfg, err := config.Load(viper.GetString(config.MachineProfile))
if err != nil {
exit.WithError("Error getting config", err)
if runtime.GOOS == "darwin" && cfg.VMDriver == "docker" {
port, err := oci.HostPortBinding("docker", "minikube", 22)
if err != nil {
exit.WithError("error getting ssh port", err)
}
sshPort := strconv.Itoa(port)
sshKey := filepath.Join(localpath.MiniPath(), "machines", cfg.Name, "id_rsa")

kicSSHTunnel := kic.NewSSHTunnel(ctx, sshPort, sshKey, clientset.CoreV1())
err = kicSSHTunnel.Start()
if err != nil {
exit.WithError("error starting tunnel", err)
}

return
}

done, err := manager.StartTunnel(ctx, cfg.Name, api, config.DefaultLoader, clientset.CoreV1())
if err != nil {
exit.WithError("error starting tunnel", err)
Expand Down
51 changes: 51 additions & 0 deletions pkg/minikube/tunnel/kic/ssh_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package kic

import (
"fmt"
"os/exec"

v1 "k8s.io/api/core/v1"
)

type sshConn struct {
name string
cmd *exec.Cmd
}

func createSSHConn(name, sshPort, sshKey string, svc v1.Service) *sshConn {
// extract sshArgs
sshArgs := []string{
"-N",
"[email protected]",
"-p", sshPort,
"-i", sshKey,
}

for _, port := range svc.Spec.Ports {
arg := fmt.Sprintf(
"-L %d:%s:%d",
port.Port,
svc.Spec.ClusterIP,
port.Port,
)

sshArgs = append(sshArgs, arg)
}

cmd := exec.Command("ssh", sshArgs...)

return &sshConn{
name: name,
cmd: cmd,
}
}

func (c *sshConn) run() error {
fmt.Printf("starting tunnel for %s\n", c.name)
return c.cmd.Run()
}

func (c *sshConn) stop() error {
fmt.Printf("stopping tunnel for %s\n", c.name)
return c.cmd.Process.Kill()
}
134 changes: 134 additions & 0 deletions pkg/minikube/tunnel/kic/ssh_tunnel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package kic

import (
"context"
"fmt"
"strings"
"time"

"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
typed_core "k8s.io/client-go/kubernetes/typed/core/v1"

"k8s.io/minikube/pkg/minikube/tunnel"
)

// SSHTunnel ...
type SSHTunnel struct {
ctx context.Context
sshPort string
sshKey string
v1Core typed_core.CoreV1Interface
LoadBalancerEmulator tunnel.LoadBalancerEmulator
conns map[string]*sshConn
connsToStop map[string]*sshConn
}

// NewSSHTunnel ...
func NewSSHTunnel(ctx context.Context, sshPort, sshKey string, v1Core typed_core.CoreV1Interface) *SSHTunnel {
return &SSHTunnel{
ctx: ctx,
sshPort: sshPort,
sshKey: sshKey,
v1Core: v1Core,
LoadBalancerEmulator: tunnel.NewLoadBalancerEmulator(v1Core),
conns: make(map[string]*sshConn),
connsToStop: make(map[string]*sshConn),
}
}

// Start ...
func (t *SSHTunnel) Start() error {
for {
select {
case <-t.ctx.Done():
_, err := t.LoadBalancerEmulator.Cleanup()
if err != nil {
glog.Errorf("error cleaning up: %v", err)
}
return err
default:
}

services, err := t.v1Core.Services("").List(metav1.ListOptions{})
if err != nil {
glog.Errorf("error listing services: %v", err)
}

t.markConnectionsToBeStopped()

for _, svc := range services.Items {
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
t.startConnection(svc)
}
}

t.stopMarkedConnections()

// TODO: which time to use?
time.Sleep(1 * time.Second)
}
}

func (t *SSHTunnel) markConnectionsToBeStopped() {
for _, conn := range t.conns {
t.connsToStop[conn.name] = conn
}
}

func (t *SSHTunnel) startConnection(svc v1.Service) {
uniqName := sshConnUniqName(svc)
existingSSHConn, ok := t.conns[uniqName]

if ok {
// if the svc still exist we remove the conn from the stopping list
delete(t.connsToStop, existingSSHConn.name)
return
}

// create new ssh conn
newSSHConn := createSSHConn(uniqName, t.sshPort, t.sshKey, svc)
t.conns[newSSHConn.name] = newSSHConn

go func() {
err := newSSHConn.run()
if err != nil {
glog.Errorf("error starting ssh tunnel: %v", err)
}

}()

err := t.LoadBalancerEmulator.PatchServiceIP(t.v1Core.RESTClient(), svc, "127.0.0.1")
if err != nil {
glog.Errorf("error patching service: %v", err)
}
}

func (t *SSHTunnel) stopMarkedConnections() {
for _, sshConn := range t.connsToStop {
err := sshConn.stop()
if err != nil {
glog.Errorf("error stopping ssh tunnel: %v", err)
}
delete(t.conns, sshConn.name)
delete(t.connsToStop, sshConn.name)
}
}

// sshConnName creates a uniq name for the tunnel, using its name/clusterIP/ports.
// This allows a new process to be created if an existing service was changed,
// the new process will support the IP/Ports change ocurred.
func sshConnUniqName(service v1.Service) string {
n := []string{
service.Name,
"-",
service.Spec.ClusterIP,
}

for _, port := range service.Spec.Ports {
n = append(n, fmt.Sprintf("-%d", port.Port))
}

return strings.Join(n, "")
}
38 changes: 26 additions & 12 deletions pkg/minikube/tunnel/loadbalancer_patcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,28 @@ type patchConverter interface {
convert(restClient rest.Interface, patch *Patch) *rest.Request
}

// loadBalancerEmulator is the main struct for emulating the loadbalancer behavior. it sets the ingress to the cluster IP
type loadBalancerEmulator struct {
// LoadBalancerEmulator is the main struct for emulating the loadbalancer behavior. it sets the ingress to the cluster IP
type LoadBalancerEmulator struct {
coreV1Client typed_core.CoreV1Interface
requestSender requestSender
patchConverter patchConverter
}

func (l *loadBalancerEmulator) PatchServices() ([]string, error) {
func (l *LoadBalancerEmulator) PatchServices() ([]string, error) {
return l.applyOnLBServices(l.updateService)
}

func (l *loadBalancerEmulator) Cleanup() ([]string, error) {
func (l *LoadBalancerEmulator) PatchServiceIP(restClient rest.Interface, svc core.Service, ip string) error {
// TODO: do not ignore result
_, err := l.updateServiceIP(restClient, svc, ip)
return err
}

func (l *LoadBalancerEmulator) Cleanup() ([]string, error) {
return l.applyOnLBServices(l.cleanupService)
}

func (l *loadBalancerEmulator) applyOnLBServices(action func(restClient rest.Interface, svc core.Service) ([]byte, error)) ([]string, error) {
func (l *LoadBalancerEmulator) applyOnLBServices(action func(restClient rest.Interface, svc core.Service) ([]byte, error)) ([]string, error) {
services := l.coreV1Client.Services("")
serviceList, err := services.List(meta.ListOptions{})
if err != nil {
Expand All @@ -79,14 +85,22 @@ func (l *loadBalancerEmulator) applyOnLBServices(action func(restClient rest.Int
}
return managedServices, nil
}
func (l *loadBalancerEmulator) updateService(restClient rest.Interface, svc core.Service) ([]byte, error) {

func (l *LoadBalancerEmulator) updateService(restClient rest.Interface, svc core.Service) ([]byte, error) {
clusterIP := svc.Spec.ClusterIP
ingresses := svc.Status.LoadBalancer.Ingress
if len(ingresses) == 1 && ingresses[0].IP == clusterIP {
return nil, nil
}
return l.updateServiceIP(restClient, svc, clusterIP)
}

func (l *LoadBalancerEmulator) updateServiceIP(restClient rest.Interface, svc core.Service, ip string) ([]byte, error) {
if len(ip) == 0 {
return nil, nil
}
glog.V(3).Infof("[%s] setting ClusterIP as the LoadBalancer Ingress", svc.Name)
jsonPatch := fmt.Sprintf(`[{"op": "add", "path": "/status/loadBalancer/ingress", "value": [ { "ip": "%s" } ] }]`, clusterIP)
jsonPatch := fmt.Sprintf(`[{"op": "add", "path": "/status/loadBalancer/ingress", "value": [ { "ip": "%s" } ] }]`, ip)
patch := &Patch{
Type: types.JSONPatchType,
ResourceName: svc.Name,
Expand All @@ -99,14 +113,14 @@ func (l *loadBalancerEmulator) updateService(restClient rest.Interface, svc core
request := l.patchConverter.convert(restClient, patch)
result, err := l.requestSender.send(request)
if err != nil {
glog.Errorf("error patching %s with IP %s: %s", svc.Name, clusterIP, err)
glog.Errorf("error patching %s with IP %s: %s", svc.Name, ip, err)
} else {
glog.Infof("Patched %s with IP %s", svc.Name, clusterIP)
glog.Infof("Patched %s with IP %s", svc.Name, ip)
}
return result, err
}

func (l *loadBalancerEmulator) cleanupService(restClient rest.Interface, svc core.Service) ([]byte, error) {
func (l *LoadBalancerEmulator) cleanupService(restClient rest.Interface, svc core.Service) ([]byte, error) {
ingresses := svc.Status.LoadBalancer.Ingress
if len(ingresses) == 0 {
return nil, nil
Expand All @@ -129,8 +143,8 @@ func (l *loadBalancerEmulator) cleanupService(restClient rest.Interface, svc cor

}

func newLoadBalancerEmulator(corev1Client typed_core.CoreV1Interface) loadBalancerEmulator {
return loadBalancerEmulator{
func NewLoadBalancerEmulator(corev1Client typed_core.CoreV1Interface) LoadBalancerEmulator {
return LoadBalancerEmulator{
coreV1Client: corev1Client,
requestSender: &defaultRequestSender{},
patchConverter: &defaultPatchConverter{},
Expand Down
8 changes: 4 additions & 4 deletions pkg/minikube/tunnel/loadbalancer_patcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestEmptyListOfServicesDoesNothing(t *testing.T) {
client := newStubCoreClient(&core.ServiceList{
Items: []core.Service{}})

patcher := newLoadBalancerEmulator(client)
patcher := NewLoadBalancerEmulator(client)

serviceNames, err := patcher.PatchServices()

Expand All @@ -113,7 +113,7 @@ func TestServicesWithNoLoadbalancerType(t *testing.T) {
},
})

patcher := newLoadBalancerEmulator(client)
patcher := NewLoadBalancerEmulator(client)

serviceNames, err := patcher.PatchServices()

Expand Down Expand Up @@ -214,7 +214,7 @@ func TestServicesWithLoadbalancerType(t *testing.T) {
requestSender := &countingRequestSender{}
patchConverter := &recordingPatchConverter{}

patcher := newLoadBalancerEmulator(client)
patcher := NewLoadBalancerEmulator(client)
patcher.requestSender = requestSender
patcher.patchConverter = patchConverter

Expand Down Expand Up @@ -328,7 +328,7 @@ func TestCleanupPatchedIPs(t *testing.T) {
requestSender := &countingRequestSender{}
patchConverter := &recordingPatchConverter{}

patcher := newLoadBalancerEmulator(client)
patcher := NewLoadBalancerEmulator(client)
patcher.requestSender = requestSender
patcher.patchConverter = patchConverter

Expand Down
Loading

0 comments on commit 5fe81c8

Please sign in to comment.