Skip to content

Commit

Permalink
refactor: container killer refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jun 29, 2021
1 parent 7cbb938 commit f289249
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 39 deletions.
43 changes: 4 additions & 39 deletions shared/containerkiller/container_killer.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,24 @@
package containerkiller

import (
"fmt"
"os"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/shared/podexec"
"github.com/argoproj-labs/argo-dataflow/shared/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)

var logger = util.NewLogger()

type Interface interface {
KillContainer(pod corev1.Pod, container string) error
}

type containerKiller struct {
kubernetes.Interface
*rest.Config
podexec.Interface
}

func New(k kubernetes.Interface, r *rest.Config) Interface {
return &containerKiller{Interface: k, Config: r}
}

func (k *containerKiller) killContainer(namespace, pod, container string, commands []string) error {
// running the wrong command appears to bork the container, so we need to make a call about which command to run
x := k.Interface.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(namespace).
Name(pod).
SubResource("exec").
Param("container", container).
Param("stdout", "true").
Param("stderr", "true").
Param("tty", "false")
for _, command := range commands {
x = x.Param("command", command)
}
logger.Info("killing container", "pod", pod, "container", container, "commands", commands)
exec, err := remotecommand.NewSPDYExecutor(k.Config, "POST", x.URL())
if err != nil {
return fmt.Errorf("failed to exec: %w", err)
}
if err := exec.Stream(remotecommand.StreamOptions{
Stdout: os.Stdout,
Stderr: os.Stderr,
Tty: true,
}); util.IgnoreNotFound(util.IgnoreContainerNotFound(err)) != nil {
return fmt.Errorf("failed to stream: %w", err)
}
return nil
return &containerKiller{podexec.New(k, r)}
}

func (k *containerKiller) KillContainer(pod corev1.Pod, container string) error {
Expand All @@ -66,5 +31,5 @@ func (k *containerKiller) KillContainer(pod corev1.Pod, container string) error
if text, ok := pod.Annotations[dfv1.KeyKillCmd(container)]; ok {
util.MustUnJSON(text, &commands)
}
return k.killContainer(pod.Namespace, pod.Name, container, commands)
return k.Exec(pod.Namespace, pod.Name, container, commands)
}
46 changes: 46 additions & 0 deletions shared/podexec/podexec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package podexec

import (
"fmt"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"os"
)

type Interface interface {
Exec(namespace, podName, container string, commands []string) error
}

type podExec struct {
kubernetes.Interface
*rest.Config
}

func New(k kubernetes.Interface, r *rest.Config) Interface {
return &podExec{Interface: k, Config: r}
}

func (k *podExec) Exec(namespace, pod, container string, commands []string) error {
// running the wrong command appears to bork the container, so we need to make a call about which command to run
x := k.Interface.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(namespace).
Name(pod).
SubResource("exec").
Param("container", container).
Param("stdout", "true").
Param("stderr", "true").
Param("tty", "true")
for _, command := range commands {
x = x.Param("command", command)
}
exec, err := remotecommand.NewSPDYExecutor(k.Config, "POST", x.URL())
if err != nil {
return fmt.Errorf("failed to exec %q: %w", commands, err)
}
if err := exec.Stream(remotecommand.StreamOptions{Stdout: os.Stdout, Stderr: os.Stderr, Tty: true}); err != nil {
return fmt.Errorf("failed to stream %q: %w", commands, err)
}
return nil
}

0 comments on commit f289249

Please sign in to comment.