diff --git a/examples/bench/example.go b/examples/bench/example.go index 1d925c014f027..00b0c5b699cb9 100644 --- a/examples/bench/example.go +++ b/examples/bench/example.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "os" + "strings" "time" "github.com/gravitational/teleport/lib/benchmark" @@ -34,7 +35,16 @@ func main() { } // Run Linear generator - results, err := benchmark.Run(context.TODO(), linear, "ls -l /", "host", "username", "teleport.example.com") + results, err := benchmark.Run( + context.TODO(), + linear, + "host", + "username", + "teleport.example.com", + benchmark.SSHBenchmark{ + Command: strings.Split("ls -l /", " "), + }, + ) if err != nil { fmt.Println(err) os.Exit(1) diff --git a/lib/benchmark/benchmark.go b/lib/benchmark/benchmark.go index 565d9dc86606f..c52c52a52c526 100644 --- a/lib/benchmark/benchmark.go +++ b/lib/benchmark/benchmark.go @@ -24,7 +24,6 @@ import ( "os" "os/signal" "path/filepath" - "strings" "syscall" "time" @@ -34,7 +33,6 @@ import ( "github.com/gravitational/teleport/lib/client" "github.com/gravitational/teleport/lib/observability/tracing" - "github.com/gravitational/teleport/lib/utils" ) const ( @@ -48,14 +46,20 @@ const ( pauseTimeBetweenBenchmarks = time.Second * 5 ) +// Service is a the Teleport service to benchmark. +type Service string + +const ( + // SSHService is the SSH service + SSHService Service = "ssh" + // KubernetesService is the Kubernetes service + KubernetesService Service = "kube" +) + // Config specifies benchmark requests to run type Config struct { // Rate is requests per second origination rate Rate int - // Command is a command to run - Command []string - // Interactive turns on interactive sessions - Interactive bool // MinimumWindow is the min duration MinimumWindow time.Duration // MinimumMeasurments is the min amount of requests @@ -79,9 +83,8 @@ type Result struct { // Run is used to run the benchmarks, it is given a generator, command to run, // a host, host login, and proxy. If host login or proxy is an empty string, it will // use the default login -func Run(ctx context.Context, lg *Linear, cmd, host, login, proxy string) ([]Result, error) { - c := strings.Split(cmd, " ") - lg.config = &Config{Command: c} +func Run(ctx context.Context, lg *Linear, host, login, proxy string, suite BenchmarkSuite) ([]Result, error) { + lg.config = &Config{} if err := validateConfig(lg); err != nil { return nil, trace.Wrap(err) } @@ -112,7 +115,7 @@ func Run(ctx context.Context, lg *Linear, cmd, host, login, proxy string) ([]Res if benchmarkC == nil { break } - result, err := benchmarkC.Benchmark(ctx, tc) + result, err := benchmarkC.Benchmark(ctx, tc, suite) if err != nil { return results, trace.Wrap(err) } @@ -128,7 +131,7 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v timeStamp := time.Now().Format("2006-01-02_15:04:05") suffix := fmt.Sprintf("latency_profile_%s.txt", timeStamp) if path != "." { - if err := os.MkdirAll(path, 0700); err != nil { + if err := os.MkdirAll(path, 0o700); err != nil { return "", trace.Wrap(err) } } @@ -140,7 +143,6 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v if _, err := h.PercentilesPrint(fo, ticks, valueScale); err != nil { if err := fo.Close(); err != nil { - logrus.WithError(err).Warningf("failed to close file") } return "", trace.Wrap(err) @@ -152,13 +154,33 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v return fo.Name(), nil } +// WorkloadFunc is a function that executes a single benchmark call. +type WorkloadFunc func(context.Context) error + +// BenchmarkSuite is an interface that defines a benchmark suite. +type BenchmarkSuite interface { + // BenchBuilder returns a function that executes a single benchmark call. + // The returned function is called in a loop until the context is canceled. + BenchBuilder(context.Context, *client.TeleportClient) (WorkloadFunc, error) +} + // Benchmark connects to remote server and executes requests in parallel according // to benchmark spec. It returns benchmark result when completed. // This is a blocking function that can be canceled via context argument. -func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient) (Result, error) { +func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient, suite BenchmarkSuite) (Result, error) { + if suite == nil { + return Result{}, trace.BadParameter("missing benchmark suite") + } + + workload, err := suite.BenchBuilder(ctx, tc) + if err != nil { + return Result{}, trace.Wrap(err) + } + tc.Stdout = io.Discard tc.Stderr = io.Discard tc.Stdin = &bytes.Buffer{} + var delay time.Duration ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -180,11 +202,8 @@ func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient) (Resu t := start.Add(delay) measure := benchMeasure{ ResponseStart: t, - command: c.Command, - client: tc, - interactive: c.Interactive, } - go work(ctx, measure, resultC) + go work(ctx, measure, resultC, workload) case <-ctx.Done(): close(requestsC) return @@ -226,13 +245,10 @@ type benchMeasure struct { ResponseStart time.Time End time.Time Error error - client *client.TeleportClient - command []string - interactive bool } -func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure) { - m.Error = execute(m) +func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure, workload WorkloadFunc) { + m.Error = workload(ctx) m.End = time.Now() select { case send <- m: @@ -241,33 +257,6 @@ func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure) { } } -func execute(m benchMeasure) error { - if !m.interactive { - // do not use parent context that will cancel in flight requests - // because we give test some time to gracefully wrap up - // the in-flight connections to avoid extra errors - return m.client.SSH(context.TODO(), m.command, false) - } - config := m.client.Config - client, err := client.NewClient(&config) - if err != nil { - return err - } - reader, writer := io.Pipe() - defer reader.Close() - defer writer.Close() - client.Stdin = reader - out := &utils.SyncBuffer{} - client.Stdout = out - client.Stderr = out - err = m.client.SSH(context.TODO(), nil, false) - if err != nil { - return err - } - writer.Write([]byte(strings.Join(m.command, " ") + "\r\nexit\r\n")) - return nil -} - // makeTeleportClient creates an instance of a teleport client func makeTeleportClient(host, login, proxy string) (*client.TeleportClient, error) { c := client.Config{ diff --git a/lib/benchmark/benchmark_suites.go b/lib/benchmark/benchmark_suites.go new file mode 100644 index 0000000000000..79c1443dd2dc7 --- /dev/null +++ b/lib/benchmark/benchmark_suites.go @@ -0,0 +1,250 @@ +/* +Copyright 2023 Gravitational, Inc. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package benchmark package provides tools to run progressive or independent benchmarks against teleport services. +package benchmark + +import ( + "bytes" + "context" + "io" + "net/http" + "strings" + + "github.com/gravitational/trace" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/kubectl/pkg/scheme" + + "github.com/gravitational/teleport/lib/client" + "github.com/gravitational/teleport/lib/utils" +) + +// SSHBenchmark is a benchmark suite that runs a single SSH command +// against a Teleport node for a given duration and rate. +type SSHBenchmark struct { + // Command is a command to run + Command []string + // Interactive turns on interactive sessions + Interactive bool +} + +// BenchBuilder returns a WorkloadFunc for the given benchmark suite. +func (s SSHBenchmark) BenchBuilder(ctx context.Context, tc *client.TeleportClient) (WorkloadFunc, error) { + return func(ctx context.Context) error { + if !s.Interactive { + // do not use parent context that will cancel in flight requests + // because we give test some time to gracefully wrap up + // the in-flight connections to avoid extra errors + return tc.SSH(ctx, s.Command, false) + } + config := tc.Config + client, err := client.NewClient(&config) + if err != nil { + return err + } + reader, writer := io.Pipe() + defer reader.Close() + defer writer.Close() + client.Stdin = reader + out := &utils.SyncBuffer{} + client.Stdout = out + client.Stderr = out + err = tc.SSH(ctx, nil, false) + if err != nil { + return err + } + writer.Write([]byte(strings.Join(s.Command, " ") + "\r\nexit\r\n")) + return nil + }, nil +} + +// KubeListBenchmark is a benchmark suite that runs successive kubectl get pods +// against a Teleport Kubernetes proxy for a given duration and rate. +type KubeListBenchmark struct { + // Namespace is the Kubernetes namespace to run the command against. + // If empty, it will include pods from all namespaces. + Namespace string +} + +// BenchBuilder returns a WorkloadFunc for the given benchmark suite. +func (k KubeListBenchmark) BenchBuilder(ctx context.Context, tc *client.TeleportClient) (WorkloadFunc, error) { + restCfg, err := newKubernetesRestConfig(ctx, tc) + if err != nil { + return nil, trace.Wrap(err) + } + clientset, err := kubernetes.NewForConfig(restCfg) + if err != nil { + return nil, trace.Wrap(err) + } + + return func(ctx context.Context) error { + // List all pods in all namespaces. + _, err := clientset.CoreV1().Pods(k.Namespace).List(ctx, metav1.ListOptions{}) + return trace.Wrap(err) + }, nil +} + +// newKubernetesRestConfig returns a new rest.Config for the kubernetes cluster +// that the client wants to connected to. +func newKubernetesRestConfig(ctx context.Context, tc *client.TeleportClient) (*rest.Config, error) { + tlsClientConfig, err := getKubeTLSClientConfig(ctx, tc) + if err != nil { + return nil, trace.Wrap(err) + } + restConfig := &rest.Config{ + Host: tc.KubeClusterAddr(), + TLSClientConfig: tlsClientConfig, + APIPath: "/api", + ContentConfig: rest.ContentConfig{ + GroupVersion: &schema.GroupVersion{Version: "v1"}, + NegotiatedSerializer: scheme.Codecs, + }, + } + return restConfig, nil +} + +// getKubeTLSClientConfig returns a TLS client config for the kubernetes cluster +// that the client wants to connected to. +func getKubeTLSClientConfig(ctx context.Context, tc *client.TeleportClient) (rest.TLSClientConfig, error) { + var k *client.Key + err := client.RetryWithRelogin(ctx, tc, func() error { + var err error + k, err = tc.IssueUserCertsWithMFA(ctx, client.ReissueParams{ + RouteToCluster: tc.SiteName, + KubernetesCluster: tc.KubernetesCluster, + }, nil /*applyOpts*/) + return err + }) + if err != nil { + return rest.TLSClientConfig{}, trace.Wrap(err) + } + + certPem := k.KubeTLSCerts[tc.KubernetesCluster] + + rsaKeyPEM, err := k.PrivateKey.RSAPrivateKeyPEM() + if err != nil { + return rest.TLSClientConfig{}, trace.Wrap(err) + } + + credentials, err := tc.LocalAgent().GetCoreKey() + if err != nil { + return rest.TLSClientConfig{}, trace.Wrap(err) + } + + var clusterCAs [][]byte + if tc.LoadAllCAs { + clusterCAs = credentials.TLSCAs() + } else { + clusterCAs, err = credentials.RootClusterCAs() + if err != nil { + return rest.TLSClientConfig{}, trace.Wrap(err) + } + } + if len(clusterCAs) == 0 { + return rest.TLSClientConfig{}, trace.BadParameter("no trusted CAs found") + } + + tlsServerName := "" + if tc.TLSRoutingEnabled { + k8host, _ := tc.KubeProxyHostPort() + tlsServerName = client.GetKubeTLSServerName(k8host) + } + + return rest.TLSClientConfig{ + CAData: bytes.Join(clusterCAs, []byte("\n")), + CertData: certPem, + KeyData: rsaKeyPEM, + ServerName: tlsServerName, + }, nil +} + +// KubeListBenchmark is a benchmark suite that runs successive kubectl exec +// against a Teleport Kubernetes proxy for a given duration and rate. +type KubeExecBenchmark struct { + // Namespace is the Kubernetes namespace to run the command against. + Namespace string + // PodName is the name of the pod to run the command against. + PodName string + // ContainerName is the name of the container to run the command against. + ContainerName string + // Command is the command to run. + Command []string + // Interactive turns on interactive sessions + Interactive bool +} + +// BenchBuilder returns a WorkloadFunc for the given benchmark suite. +func (k KubeExecBenchmark) BenchBuilder(ctx context.Context, tc *client.TeleportClient) (WorkloadFunc, error) { + restCfg, err := newKubernetesRestConfig(ctx, tc) + if err != nil { + return nil, trace.Wrap(err) + } + if k.Interactive { + // If interactive, we need to set up a pty and we cannot use the + // stderr stream because the server will hang. + tc.Stderr = nil + } else { + // If not interactive, we need to set up stdin to be nil so that + // the server wont wait for input. + tc.Stdin = nil + } + exec, err := k.kubeExecOnPod(ctx, tc, restCfg) + if err != nil { + return nil, trace.Wrap(err) + } + return func(ctx context.Context) error { + stdin := tc.Stdin + if k.Interactive { + stdin = bytes.NewBuffer([]byte(strings.Join(k.Command, " ") + "\r\nexit\r\n")) + } + err := exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: stdin, + Stdout: tc.Stdout, + Stderr: tc.Stderr, + Tty: k.Interactive, + }) + return trace.Wrap(err) + }, nil +} + +func (k KubeExecBenchmark) kubeExecOnPod(ctx context.Context, tc *client.TeleportClient, restConfig *rest.Config) (remotecommand.Executor, error) { + restClient, err := rest.RESTClientFor(restConfig) + if err != nil { + return nil, trace.Wrap(err) + } + + req := restClient.Post(). + Resource("pods"). + Name(k.PodName). + Namespace(k.Namespace). + SubResource("exec") + + req.VersionedParams(&corev1.PodExecOptions{ + Container: k.ContainerName, + Command: k.Command, + Stdin: tc.Stdin != nil, + Stdout: tc.Stdout != nil, + Stderr: tc.Stderr != nil, + TTY: k.Interactive, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(restConfig, http.MethodPost, req.URL()) + return exec, trace.Wrap(err) +} diff --git a/lib/benchmark/linear.go b/lib/benchmark/linear.go index 396ad090d7e11..38d3809c4eb13 100644 --- a/lib/benchmark/linear.go +++ b/lib/benchmark/linear.go @@ -42,7 +42,6 @@ func (lg *Linear) GetBenchmark() *Config { MinimumWindow: lg.MinimumWindow, MinimumMeasurements: lg.MinimumMeasurements, Rate: lg.currentRPS, - Command: lg.config.Command, } if lg.currentRPS < lg.LowerBound { diff --git a/lib/benchmark/linear_test.go b/lib/benchmark/linear_test.go index 60055d2fc4a2c..6e38864829340 100644 --- a/lib/benchmark/linear_test.go +++ b/lib/benchmark/linear_test.go @@ -26,8 +26,6 @@ import ( func TestGetBenchmark(t *testing.T) { initial := &Config{ Rate: 0, - Command: []string{"ls"}, - Interactive: false, MinimumWindow: time.Second * 30, MinimumMeasurements: 1000, } @@ -53,8 +51,6 @@ func TestGetBenchmark(t *testing.T) { func TestGetBenchmarkNotEvenMultiple(t *testing.T) { initial := &Config{ Rate: 0, - Command: []string{"ls"}, - Interactive: false, MinimumWindow: time.Second * 30, MinimumMeasurements: 1000, } diff --git a/tool/tsh/tsh.go b/tool/tsh/tsh.go index bc1af4ed2eab3..69da277e429b4 100644 --- a/tool/tsh/tsh.go +++ b/tool/tsh/tsh.go @@ -860,17 +860,30 @@ func Run(ctx context.Context, args []string, opts ...cliOption) error { // bench bench := app.Command("bench", "Run shell or execute a command on a remote SSH node").Hidden() bench.Flag("cluster", clusterHelp).Short('c').StringVar(&cf.SiteName) - bench.Arg("[user@]host", "Remote hostname and the login to use").Required().StringVar(&cf.UserHost) - bench.Arg("command", "Command to execute on a remote host").Required().StringsVar(&cf.RemoteCommand) - bench.Flag("port", "SSH port on a remote host").Short('p').Int32Var(&cf.NodePort) bench.Flag("duration", "Test duration").Default("1s").DurationVar(&cf.BenchDuration) bench.Flag("rate", "Requests per second rate").Default("10").IntVar(&cf.BenchRate) - bench.Flag("interactive", "Create interactive SSH session").BoolVar(&cf.BenchInteractive) bench.Flag("export", "Export the latency profile").BoolVar(&cf.BenchExport) bench.Flag("path", "Directory to save the latency profile to, default path is the current directory").Default(".").StringVar(&cf.BenchExportPath) bench.Flag("ticks", "Ticks per half distance").Default("100").Int32Var(&cf.BenchTicks) bench.Flag("scale", "Value scale in which to scale the recorded values").Default("1.0").Float64Var(&cf.BenchValueScale) + benchSSH := bench.Command("ssh", "Run SSH benchmark test") + benchSSH.Arg("[user@]host", "Remote hostname and the login to use").Required().StringVar(&cf.UserHost) + benchSSH.Arg("command", "Command to execute on a remote host").Required().StringsVar(&cf.RemoteCommand) + benchSSH.Flag("port", "SSH port on a remote host").Short('p').Int32Var(&cf.NodePort) + benchSSH.Flag("interactive", "Create interactive SSH session").BoolVar(&cf.BenchInteractive) + var benchKubeOpts benchKubeOptions + benchKube := bench.Command("kube", "Run Kube benchmark test") + benchKube.Flag("kube-namespace", "Selects the ").Default("default").StringVar(&benchKubeOpts.namespace) + benchListKube := benchKube.Command("ls", "Run a benchmark test to list Pods") + benchListKube.Arg("kube_cluster", "Kubernetes cluster to use").Required().StringVar(&cf.KubernetesCluster) + benchExecKube := benchKube.Command("exec", "Run a benchmark test to exec into the specified Pod") + benchExecKube.Arg("kube_cluster", "Kubernetes cluster to use").Required().StringVar(&cf.KubernetesCluster) + benchExecKube.Arg("pod", "Pod name to exec into").Required().StringVar(&benchKubeOpts.pod) + benchExecKube.Arg("command", "Command to execute on a pod").Required().StringsVar(&cf.RemoteCommand) + benchExecKube.Flag("container", "Selects the container to exec into.").StringVar(&benchKubeOpts.container) + benchExecKube.Flag("interactive", "Create interactive Kube session").BoolVar(&cf.BenchInteractive) + // show key show := app.Command("show", "Read an identity from file and print to stdout").Hidden() show.Arg("identity_file", "The file containing a public key or a certificate").Required().StringVar(&cf.IdentityFileIn) @@ -1126,8 +1139,31 @@ func Run(ctx context.Context, args []string, opts ...cliOption) error { err = onVersion(&cf) case ssh.FullCommand(): err = onSSH(&cf) - case bench.FullCommand(): - err = onBenchmark(&cf) + case benchSSH.FullCommand(): + err = onBenchmark( + &cf, + &benchmark.SSHBenchmark{ + Command: cf.RemoteCommand, + }, + ) + case benchListKube.FullCommand(): + err = onBenchmark( + &cf, + &benchmark.KubeListBenchmark{ + Namespace: benchKubeOpts.namespace, + }, + ) + case benchExecKube.FullCommand(): + err = onBenchmark( + &cf, + &benchmark.KubeExecBenchmark{ + Command: cf.RemoteCommand, + Namespace: benchKubeOpts.namespace, + PodName: benchKubeOpts.pod, + ContainerName: benchKubeOpts.container, + Interactive: cf.BenchInteractive, + }, + ) case join.FullCommand(): err = onJoin(&cf) case scp.FullCommand(): @@ -1399,6 +1435,12 @@ func fetchProxyVersion(cf *CLIConf) (string, string, error) { return pingRes.ServerVersion, pingRes.Proxy.SSH.PublicAddr, nil } +type benchKubeOptions struct { + pod string + container string + namespace string +} + func serializeVersion(format string, proxyVersion string, proxyPublicAddress string) (string, error) { versionInfo := struct { Version string `json:"version"` @@ -3079,17 +3121,16 @@ func onSSH(cf *CLIConf) error { } // onBenchmark executes benchmark -func onBenchmark(cf *CLIConf) error { +func onBenchmark(cf *CLIConf, suite benchmark.BenchmarkSuite) error { tc, err := makeClient(cf, false) if err != nil { return trace.Wrap(err) } cnf := benchmark.Config{ - Command: cf.RemoteCommand, MinimumWindow: cf.BenchDuration, Rate: cf.BenchRate, } - result, err := cnf.Benchmark(cf.Context, tc) + result, err := cnf.Benchmark(cf.Context, tc, suite) if err != nil { fmt.Fprintln(os.Stderr, utils.UserMessageFromError(err)) return trace.Wrap(&common.ExitCodeError{Code: 255})