Skip to content

Commit

Permalink
fix: Fixing log output and adding dry run args
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Boutour <[email protected]>
  • Loading branch information
ViBiOh committed Aug 10, 2022
1 parent 6ce7ca8 commit 7b6d3e1
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 23 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2019-2022 ViBiOh
Copyright (c) 2022 ViBiOh

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
74 changes: 61 additions & 13 deletions cmd/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"bufio"
"bytes"
"context"
"fmt"
"strings"
Expand All @@ -19,15 +20,25 @@ import (
)

var (
dryRun bool
since time.Duration
containers []string
)

var logCmd = &cobra.Command{
Use: "log <resource_type> <resource_name>",
Aliases: []string{"logs"},
Short: "Get logs of a given resource",
Args: cobra.ExactValidArgs(2),
Use: "log <resource_type> <resource_name>",
Aliases: []string{"logs"},
Short: "Get logs of a given resource",
ArgAliases: pod.ResourcesAliases,
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
if len(args) == 0 {
return pod.Resources, cobra.ShellCompDirectiveNoFileComp
}

return nil, cobra.ShellCompDirectiveNoFileComp
},

Args: cobra.ExactValidArgs(2),
Run: func(cmd *cobra.Command, args []string) {
resourceType := args[0]
resourceName := args[1]
Expand Down Expand Up @@ -59,10 +70,13 @@ var logCmd = &cobra.Command{
}

streamCancel, ok := onGoingStreams[pod.UID]
if event.Type == watch.Deleted || pod.Status.Phase == v1.PodSucceeded {

if event.Type == watch.Deleted || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
if ok {
streamCancel()
delete(onGoingStreams, pod.UID)
} else if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
handlePod(ctx, onGoingStreams, streaming, kube, pod)
}

continue
Expand All @@ -87,6 +101,7 @@ func initLog() {

flags.DurationVarP(&since, "since", "s", time.Hour, "Display logs since given duration")
flags.StringSliceVarP(&containers, "containers", "c", nil, "Filter container's name, default to all containers")
flags.BoolVarP(&dryRun, "dry-run", "d", false, "Dry-run, print only pods")
}

func handlePod(ctx context.Context, onGoingStreams map[types.UID]func(), streaming *concurrent.Simple, kube client.Kube, pod *v1.Pod) {
Expand All @@ -98,19 +113,51 @@ func handlePod(ctx context.Context, onGoingStreams map[types.UID]func(), streami
if !isContainerSelected(container) {
continue
}

streamCtx, streamCancel := context.WithCancel(ctx)
onGoingStreams[pod.UID] = streamCancel
container := container

if dryRun {
kube.Info("%s %s", output.Green(fmt.Sprintf("[%s/%s]", pod.Name, container.Name)), output.Yellow("Found!"))
continue
}

streaming.Go(func() {
if pod.Status.Phase != v1.PodRunning {
logPod(ctx, kube, pod.Namespace, pod.Name, container.Name)
return
}

streamCtx, streamCancel := context.WithCancel(ctx)
onGoingStreams[pod.UID] = streamCancel
defer streamCancel()

streamPod(streamCtx, kube, pod.Namespace, pod.Name, container.Name)
})
}
}

func logPod(ctx context.Context, kube client.Kube, namespace, name, container string) {
sinceSeconds := int64(since.Seconds())

content, err := kube.CoreV1().Pods(namespace).GetLogs(name, &v1.PodLogOptions{
SinceSeconds: &sinceSeconds,
Container: container,
}).DoRaw(ctx)
if err != nil {
kube.Err("%s", err)
return
}

outputter := kube.Child(output.Green(fmt.Sprintf("[%s/%s]", name, container)))
defer outputter.Info(output.Yellow("Log ended."))

streamScanner := bufio.NewScanner(bytes.NewReader(content))
streamScanner.Split(bufio.ScanLines)

for streamScanner.Scan() {
outputter.Std(streamScanner.Text())
}
}

func streamPod(ctx context.Context, kube client.Kube, namespace, name, container string) {
sinceSeconds := int64(since.Seconds())

Expand All @@ -130,16 +177,17 @@ func streamPod(ctx context.Context, kube client.Kube, namespace, name, container
}
}()

outputter := kube.Child(output.Green(fmt.Sprintf("[%s/%s]", name, container)))

outputter.Info(output.Yellow("Streaming log..."))
defer outputter.Info(output.Yellow("Streaming ended."))

streamScanner := bufio.NewScanner(stream)
streamScanner.Split(bufio.ScanLines)

outputter := kube.Child(output.Green(fmt.Sprintf("[%s/%s]", name, container)))

for streamScanner.Scan() {
outputter.Std("%s", streamScanner.Text())
outputter.Std(streamScanner.Text())
}

outputter.Info("%s", output.Yellow("Stream ended."))
}

func isContainerSelected(container v1.Container) bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Info(prefix, format string, args ...any) {
}

func Fatal(format string, args ...any) {
Err("", format+"\n", args...)
fmt.Fprint(os.Stderr, Red(fmt.Sprintf(format, args...)))
os.Exit(1)
}

Expand Down
9 changes: 3 additions & 6 deletions pkg/output/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ type event struct {

var (
done = make(chan struct{})
outputChan = make(chan event, 4)
outputChan = make(chan event, 8)
)

func init() {
go print()
}

func print() {
close(done)
defer close(done)

for outputEvent := range outputChan {
for _, line := range strings.Split(outputEvent.message, "\n") {
Expand All @@ -49,8 +49,5 @@ func Done() <-chan struct{} {
}

func outputContent(std bool, prefix, format string, args ...any) {
select {
case outputChan <- event{std: std, prefix: prefix, message: fmt.Sprintf(format, args...)}:
default:
}
outputChan <- event{std: std, prefix: prefix, message: fmt.Sprintf(format, args...)}
}
20 changes: 18 additions & 2 deletions pkg/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,27 @@ import (
"strings"

"github.com/ViBiOh/kube/pkg/client"
"github.com/ViBiOh/kube/pkg/output"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

var Resources = []string{
"deployments",
"cronjobs",
"jobs",
"daemonsets",
}

var ResourcesAliases = []string{
"deploy",
"deployment",
"cj",
"cronjob",
"job",
"ds",
"daemonset",
}

type PodWatcher func(context.Context, client.Kube) (watch.Interface, error)

func WatcherLabelSelector(resourceType, resourceName string) PodWatcher {
Expand All @@ -27,7 +43,7 @@ func WatcherLabelSelector(resourceType, resourceName string) PodWatcher {
case "ds", "daemonset", "daemonsets":
labelGetter = getDaemonSetLabelSelector
default:
output.Fatal("unhandled resource type for log: %s", resourceType)
return nil, fmt.Errorf("unhandled resource type for log: %s", resourceType)
}

labelSelector, err := labelGetter(ctx, kube, resourceName)
Expand Down

0 comments on commit 7b6d3e1

Please sign in to comment.