From 8a59c039b50ac56e965e53d798c34da40182826f Mon Sep 17 00:00:00 2001 From: Lorenzo Fontana Date: Tue, 11 Dec 2018 09:39:05 +0100 Subject: [PATCH] feat(cmd/log): initial implementation of the `kubectl trace logs` command Signed-off-by: Lorenzo Fontana --- go.mod | 4 +- go.sum | 8 +- pkg/cmd/log.go | 150 +++++++++++++++++++++ pkg/cmd/trace.go | 1 + pkg/logs/logs.go | 94 +++++++++++++ vendor/golang.org/x/net/http2/transport.go | 25 +++- vendor/modules.txt | 4 +- 7 files changed, 276 insertions(+), 10 deletions(-) create mode 100644 pkg/cmd/log.go create mode 100644 pkg/logs/logs.go diff --git a/go.mod b/go.mod index 08706230..4f7c12a4 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/spf13/pflag v1.0.3 github.com/stevvooe/resumable v0.0.0-20180830230917-22b14a53ba50 // indirect golang.org/x/crypto v0.0.0-20181112202954-3d3f9f413869 // indirect - golang.org/x/net v0.0.0-20181114220301-adae6a3d119a // indirect + golang.org/x/net v0.0.0-20181201002055-351d144fa1fc // indirect golang.org/x/oauth2 v0.0.0-20181120190819-8f65e3013eba // indirect golang.org/x/sync v0.0.0-20181108010431-42b317875d0f // indirect golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b // indirect @@ -53,7 +53,7 @@ require ( k8s.io/client-go v9.0.0+incompatible k8s.io/klog v0.1.0 // indirect k8s.io/kube-openapi v0.0.0-20181114233023-0317810137be // indirect - k8s.io/kubernetes v1.12.2 + k8s.io/kubernetes v1.12.3 k8s.io/utils v0.0.0-20181115163542-0d26856f57b3 // indirect sigs.k8s.io/yaml v1.1.0 // indirect ) diff --git a/go.sum b/go.sum index 996cfd75..b5779568 100644 --- a/go.sum +++ b/go.sum @@ -105,8 +105,8 @@ golang.org/x/crypto v0.0.0-20181112202954-3d3f9f413869/go.mod h1:6SG95UA2DQfeDnf golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20181114220301-adae6a3d119a h1:gOpx8G595UYyvj8UK4+OFyY4rx037g3fmfhe5SasG3U= -golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181201002055-351d144fa1fc h1:a3CU5tJYVj92DY2LaA1kUkrsqD5/3mLDhx2NcNqyW+0= +golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/oauth2 v0.0.0-20181120190819-8f65e3013eba h1:YDkOrzGLLYybtuP6ZgebnO4OWYEYVMFSniazXsxrFN8= golang.org/x/oauth2 v0.0.0-20181120190819-8f65e3013eba/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -151,8 +151,8 @@ k8s.io/klog v0.1.0 h1:I5HMfc/DtuVaGR1KPwUrTc476K8NCqNBldC7H4dYEzk= k8s.io/klog v0.1.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/kube-openapi v0.0.0-20181114233023-0317810137be h1:aWEq4nbj7HRJ0mtKYjNSk/7X28Tl6TI6FeG8gKF+r7Q= k8s.io/kube-openapi v0.0.0-20181114233023-0317810137be/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc= -k8s.io/kubernetes v1.12.2 h1:JEj2cxR+5T31U4klP5hI5dxjr6udRIHm+4dzCEPk498= -k8s.io/kubernetes v1.12.2/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= +k8s.io/kubernetes v1.12.3 h1:1FH8TXY6pIuOHoNlsq9bB6rmCP0vfvQZH5YN7OXCHXU= +k8s.io/kubernetes v1.12.3/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= k8s.io/utils v0.0.0-20181115163542-0d26856f57b3 h1:S3/Kq185JnolOEemhmDXXd23l2t4bX5hPQPQPADlF1E= k8s.io/utils v0.0.0-20181115163542-0d26856f57b3/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0= sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= diff --git a/pkg/cmd/log.go b/pkg/cmd/log.go new file mode 100644 index 00000000..7dd230a5 --- /dev/null +++ b/pkg/cmd/log.go @@ -0,0 +1,150 @@ +package cmd + +import ( + "context" + "fmt" + + "github.com/fntlnz/kubectl-trace/pkg/factory" + "github.com/fntlnz/kubectl-trace/pkg/logs" + "github.com/fntlnz/kubectl-trace/pkg/meta" + "github.com/fntlnz/kubectl-trace/pkg/signals" + "github.com/fntlnz/kubectl-trace/pkg/tracejob" + "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/types" + "k8s.io/cli-runtime/pkg/genericclioptions" + batchv1client "k8s.io/client-go/kubernetes/typed/batch/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" +) + +var ( + logShort = `` // Wrap with i18n.T() + logLong = logShort + ` + +...` + + logExamples = ` + # ... + %[1]s trace log -h + + # ... + %[1]s trace log` +) + +// LogOptions ... +type LogOptions struct { + genericclioptions.IOStreams + traceID *types.UID + traceName *string + namespace string + clientConfig *rest.Config +} + +// NewLogOptions provides an instance of LogOptions with default values. +func NewLogOptions(streams genericclioptions.IOStreams) *LogOptions { + return &LogOptions{ + IOStreams: streams, + } +} + +// NewLogCommand provides the log command wrapping LogOptions. +func NewLogCommand(factory factory.Factory, streams genericclioptions.IOStreams) *cobra.Command { + o := NewLogOptions(streams) + + cmd := &cobra.Command{ + Use: "log (TRACE_ID | TRACE_NAME)", + DisableFlagsInUseLine: true, + Short: logShort, + Long: logLong, // Wrap with templates.LongDesc() + Example: fmt.Sprintf(logExamples, "kubectl"), // Wrap with templates.Examples() + PreRunE: func(c *cobra.Command, args []string) error { + return o.Validate(c, args) + }, + RunE: func(c *cobra.Command, args []string) error { + if err := o.Complete(factory, c, args); err != nil { + return err + } + if err := o.Run(); err != nil { + fmt.Fprintln(o.ErrOut, err.Error()) + return nil + } + return nil + }, + } + + return cmd +} + +func (o *LogOptions) Validate(cmd *cobra.Command, args []string) error { + switch len(args) { + case 1: + if meta.IsObjectName(args[0]) { + o.traceName = &args[0] + } else { + tid := types.UID(args[0]) + o.traceID = &tid + } + break + default: + return fmt.Errorf("(TRACE_ID | TRACE_NAME) is a required argument for the log command") + } + + return nil +} + +func (o *LogOptions) Complete(factory factory.Factory, cmd *cobra.Command, args []string) error { + // Prepare namespace + var err error + o.namespace, _, err = factory.ToRawKubeConfigLoader().Namespace() + if err != nil { + return err + } + + // Prepare client + o.clientConfig, err = factory.ToRESTConfig() + if err != nil { + return err + } + + return nil +} + +func (o *LogOptions) Run() error { + jobsClient, err := batchv1client.NewForConfig(o.clientConfig) + if err != nil { + return err + } + + client, err := corev1client.NewForConfig(o.clientConfig) + if err != nil { + return err + } + + tc := &tracejob.TraceJobClient{ + JobClient: jobsClient.Jobs(o.namespace), + } + + tf := tracejob.TraceJobFilter{ + Name: o.traceName, + ID: o.traceID, + } + + jobs, err := tc.GetJob(tf) + + if err != nil { + return err + } + + if len(jobs) == 0 { + return fmt.Errorf("no trace found with the provided criterias") + } + + job := jobs[0] + + ctx := context.Background() + ctx = signals.WithStandardSignals(ctx) + nl := logs.NewLogs(client, o.IOStreams) + nl.WithContext(ctx) + nl.Run(job.ID, job.Namespace) + return nil +} diff --git a/pkg/cmd/trace.go b/pkg/cmd/trace.go index f9c7d16b..79e43ec6 100644 --- a/pkg/cmd/trace.go +++ b/pkg/cmd/trace.go @@ -74,6 +74,7 @@ func NewTraceCommand(streams genericclioptions.IOStreams) *cobra.Command { cmd.AddCommand(NewAttachCommand(f, streams)) cmd.AddCommand(NewDeleteCommand(f, streams)) cmd.AddCommand(NewVersionCommand(streams)) + cmd.AddCommand(NewLogCommand(f, streams)) return cmd } diff --git a/pkg/logs/logs.go b/pkg/logs/logs.go new file mode 100644 index 00000000..ba2989aa --- /dev/null +++ b/pkg/logs/logs.go @@ -0,0 +1,94 @@ +package logs + +import ( + "github.com/fntlnz/kubectl-trace/pkg/meta" + tcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + + "fmt" + "io" + + "context" + + "k8s.io/cli-runtime/pkg/genericclioptions" +) + +type Logs struct { + genericclioptions.IOStreams + coreV1Client tcorev1.CoreV1Interface + ctx context.Context +} + +func NewLogs(client tcorev1.CoreV1Interface, streams genericclioptions.IOStreams) *Logs { + return &Logs{ + coreV1Client: client, + IOStreams: streams, + ctx: context.TODO(), + } +} + +const ( + podNotFoundError = "no trace found to get logs from with the given selector" + podPhaseNotAcceptedError = "cannot get logs from a completed trace; current phase is %s" + invalidPodContainersSizeError = "unexpected number of containers in trace job pod" +) + +func (l *Logs) WithContext(c context.Context) { + l.ctx = c +} + +func (l *Logs) Run(jobID types.UID, namespace string) error { + pl, err := l.coreV1Client.Pods(namespace).List(metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", meta.TraceIDLabelKey, jobID), + }) + + if err != nil { + return err + } + + if len(pl.Items) == 0 { + return fmt.Errorf(podNotFoundError) + } + + pod := &pl.Items[0] + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return fmt.Errorf(podPhaseNotAcceptedError, pod.Status.Phase) + } + + if len(pod.Spec.Containers) != 1 { + return fmt.Errorf(invalidPodContainersSizeError) + } + + containerName := pod.Spec.Containers[0].Name + + // TODO(fntlnz): let the user choose to follow or not + logOptions := &corev1.PodLogOptions{ + Container: containerName, + Follow: true, + Previous: false, + Timestamps: false, + } + + logsRequest := l.coreV1Client.Pods(namespace).GetLogs(pod.Name, logOptions) + + go consumeRequest(logsRequest, l.IOStreams.Out) + <-l.ctx.Done() + + return nil +} + +func consumeRequest(request *rest.Request, out io.Writer) error { + readCloser, err := request.Stream() + if err != nil { + return err + } + defer readCloser.Close() + + _, err = io.Copy(out, readCloser) + return err +} + diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go index 3fe29188..f272e8f9 100644 --- a/vendor/golang.org/x/net/http2/transport.go +++ b/vendor/golang.org/x/net/http2/transport.go @@ -97,6 +97,16 @@ type Transport struct { // to mean no limit. MaxHeaderListSize uint32 + // StrictMaxConcurrentStreams controls whether the server's + // SETTINGS_MAX_CONCURRENT_STREAMS should be respected + // globally. If false, new TCP connections are created to the + // server as needed to keep each under the per-connection + // SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the + // server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as + // a global limit and callers of RoundTrip block when needed, + // waiting for their turn. + StrictMaxConcurrentStreams bool + // t1, if non-nil, is the standard library Transport using // this transport. Its settings are used (but not its // RoundTrip method, etc). @@ -711,8 +721,19 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) { if cc.singleUse && cc.nextStreamID > 1 { return } - st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && - int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32 + var maxConcurrentOkay bool + if cc.t.StrictMaxConcurrentStreams { + // We'll tell the caller we can take a new request to + // prevent the caller from dialing a new TCP + // connection, but then we'll block later before + // writing it. + maxConcurrentOkay = true + } else { + maxConcurrentOkay = int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) + } + + st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay && + int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 st.freshConn = cc.nextStreamID == 1 && st.canTakeNewRequest return } diff --git a/vendor/modules.txt b/vendor/modules.txt index 332f1703..d8f6c8bc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -83,7 +83,7 @@ github.com/spf13/cobra github.com/spf13/pflag # golang.org/x/crypto v0.0.0-20181112202954-3d3f9f413869 golang.org/x/crypto/ssh/terminal -# golang.org/x/net v0.0.0-20181114220301-adae6a3d119a +# golang.org/x/net v0.0.0-20181201002055-351d144fa1fc golang.org/x/net/http2 golang.org/x/net/http/httpguts golang.org/x/net/http2/hpack @@ -273,7 +273,7 @@ k8s.io/client-go/tools/clientcmd/api/v1 # k8s.io/kube-openapi v0.0.0-20181114233023-0317810137be k8s.io/kube-openapi/pkg/util/proto k8s.io/kube-openapi/pkg/util/proto/validation -# k8s.io/kubernetes v1.12.2 +# k8s.io/kubernetes v1.12.3 k8s.io/kubernetes/pkg/kubectl/util/term k8s.io/kubernetes/pkg/kubectl/cmd/util/openapi k8s.io/kubernetes/pkg/kubectl/cmd/util/openapi/validation