Skip to content

Commit

Permalink
feat: Adding first version of port-forwarding
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Boutour <[email protected]>
  • Loading branch information
ViBiOh committed Aug 20, 2022
1 parent 4e6f629 commit 148f6be
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 6 deletions.
6 changes: 3 additions & 3 deletions cmd/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var logCmd = &cobra.Command{
streamCancel.(context.CancelFunc)()
activeStreams.Delete(pod.UID)
} else if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
handlePod(ctx, &activeStreams, streaming, kube, pod)
handleLogPod(ctx, &activeStreams, streaming, kube, *pod)
}

continue
Expand All @@ -112,7 +112,7 @@ var logCmd = &cobra.Command{
continue
}

handlePod(ctx, &activeStreams, streaming, kube, pod)
handleLogPod(ctx, &activeStreams, streaming, kube, *pod)
}

streaming.Wait()
Expand All @@ -130,7 +130,7 @@ func initLog() {
flags.BoolVarP(&dryRun, "dry-run", "d", false, "Dry-run, print only pods")
}

func handlePod(ctx context.Context, activeStreams *sync.Map, streaming *concurrent.Simple, kube client.Kube, pod *v1.Pod) {
func handleLogPod(ctx context.Context, activeStreams *sync.Map, streaming *concurrent.Simple, kube client.Kube, pod v1.Pod) {
for _, container := range pod.Spec.Containers {
if !isContainerSelected(container) {
continue
Expand Down
159 changes: 159 additions & 0 deletions cmd/port_forward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package cmd

import (
"context"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"syscall"

"github.com/ViBiOh/kmux/pkg/client"
"github.com/ViBiOh/kmux/pkg/concurrent"
"github.com/ViBiOh/kmux/pkg/resource"
"github.com/spf13/cobra"
"github.com/spf13/viper"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)

var portForwardCmd = &cobra.Command{
Use: "port-forward <resource_type> <resource_name> <local_port> <remote_port>",
Short: "Port forward to a ressources",
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
if len(args) == 0 {
return []string{
"daemonsets",
"deployments",
"pods",
"statefulsets",
}, cobra.ShellCompDirectiveNoFileComp
}

if len(args) == 1 {
lister, err := resource.ListerFor(args[0])
if err != nil {
return nil, cobra.ShellCompDirectiveError
}

clients, err = getKubernetesClient(strings.Split(viper.GetString("context"), ","))
if err != nil {
return nil, cobra.ShellCompDirectiveError
}

return getCommonObjects(viper.GetString("namespace"), lister), cobra.ShellCompDirectiveNoFileComp
}

return nil, cobra.ShellCompDirectiveNoFileComp
},
Args: cobra.ExactValidArgs(4),
RunE: func(cmd *cobra.Command, args []string) error {
resourceType := args[0]
resourceName := args[1]
rawLocalPort := args[2]
rawRemotePort := args[3]

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

localPort, err := strconv.ParseUint(rawLocalPort, 10, 32)
if err != nil {
return fmt.Errorf("invalid local port: %s", rawLocalPort)
}

remotePort, err := strconv.ParseUint(rawRemotePort, 10, 32)
if err != nil {
return fmt.Errorf("invalid remote port: %s", rawRemotePort)
}

go func() {
waitForEnd(syscall.SIGINT, syscall.SIGTERM)
cancel()
}()

clients.Execute(func(kube client.Kube) error {
podWatcher, err := resource.WatchPods(ctx, kube, resourceType, resourceName, dryRun)
if err != nil {
return err
}

defer podWatcher.Stop()

var activeForwarding sync.Map

forwarding := concurrent.NewSimple()

for event := range podWatcher.ResultChan() {
pod, ok := event.Object.(*v1.Pod)
if !ok {
continue
}

forwardStop, ok := activeForwarding.Load(pod.UID)
if event.Type == watch.Deleted || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
if ok {
close(forwardStop.(chan struct{}))
}

continue
}

if ok || pod.Status.Phase != v1.PodRunning {
continue
}

localPort += 1
handleForwardPod(kube, &activeForwarding, forwarding, *pod, localPort, remotePort)
}

activeForwarding.Range(func(key, value any) bool {
close(value.(chan struct{}))
return true
})

forwarding.Wait()

return nil
})

return nil
},
}

func handleForwardPod(kube client.Kube, activeForwarding *sync.Map, forwarding *concurrent.Simple, pod v1.Pod, localPort, remotePort uint64) {
stopChan := make(chan struct{})
activeForwarding.Store(pod.UID, stopChan)

forwarding.Go(func() {
defer activeForwarding.Delete(pod.UID)

kube.Info("Starting port-forward 127.0.0.1:%d => %s:%d", localPort, pod.Name, remotePort)
defer kube.Warn("Port-forward 127.0.0.1:%d => %s:%d ended.", localPort, pod.Name, remotePort)

if err := listenPortForward(kube, pod, stopChan, localPort, remotePort); err != nil {
kube.Err("Port-forward for %s failed: %s", pod.Name, err)
}
})
}

func listenPortForward(kube client.Kube, pod v1.Pod, stopChan chan struct{}, localPort, podPort uint64) error {
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", pod.Namespace, pod.Name)
hostIP := strings.TrimPrefix(kube.Config.Host, "https://")

transport, upgrader, err := spdy.RoundTripperFor(kube.Config)
if err != nil {
return fmt.Errorf("transport: %w", err)
}

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, &url.URL{Scheme: "https", Path: path, Host: hostIP})
forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", localPort, podPort)}, stopChan, nil, nil, kube.Outputter)
if err != nil {
return fmt.Errorf("dial: %w", err)
}

return forwarder.ForwardPorts()
}
3 changes: 2 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func getKubeClient(configRules clientcmd.ClientConfigLoader, context string) (cl
return client.Kube{}, fmt.Errorf("create kubernetes client: %w", err)
}

return client.New(context, namespace, clientset), nil
return client.New(context, namespace, k8sConfig, clientset), nil
}

func init() {
Expand Down Expand Up @@ -141,6 +141,7 @@ func init() {
rootCmd.AddCommand(versionCmd)
rootCmd.AddCommand(imageCmd)
rootCmd.AddCommand(restartCmd)
rootCmd.AddCommand(portForwardCmd)

initWatch()
rootCmd.AddCommand(watchCmd)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/spdystream v0.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand All @@ -69,6 +70,7 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk=
Expand Down Expand Up @@ -226,6 +228,7 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8=
github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
Expand Down
5 changes: 4 additions & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ import (
"github.com/ViBiOh/kmux/pkg/concurrent"
"github.com/ViBiOh/kmux/pkg/output"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

type Kube struct {
output.Outputter
*kubernetes.Clientset
Config *rest.Config
Name string
Namespace string
}

func New(name, namespace string, clientset *kubernetes.Clientset) Kube {
func New(name, namespace string, config *rest.Config, clientset *kubernetes.Clientset) Kube {
return Kube{
Outputter: output.NewOutputter(name),
Clientset: clientset,
Config: config,
Name: name,
Namespace: namespace,
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func NewOutputter(name string) Outputter {
}
}

func (o Outputter) Write(payload []byte) (int, error) {
Std(o.prefix, "%s", payload)
return len(payload), nil
}

func (o Outputter) Std(format string, args ...any) {
Std(o.prefix, format, args...)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/output/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func startPrinter() {
defer close(done)

for outputEvent := range outputChan {
for _, line := range strings.Split(outputEvent.message, "\n") {
message := strings.TrimSuffix(outputEvent.message, "\n")

for _, line := range strings.Split(message, "\n") {
if len(outputEvent.prefix) > 0 {
fmt.Fprint(os.Stderr, outputEvent.prefix)
}
Expand Down

0 comments on commit 148f6be

Please sign in to comment.