diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go index 27b10f4..361112b 100644 --- a/cli/pipeline/run.go +++ b/cli/pipeline/run.go @@ -14,17 +14,16 @@ package pipeline import ( - "bufio" + "context" + "errors" "fmt" "github.com/spf13/cobra" - "io" "io/ioutil" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + k8errors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/yaml" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "log" "time" @@ -87,8 +86,7 @@ func runPipeline(path string, flags *runCmdFlagsStruct) { } err = runPipelineStep(pipeline, &step, flags) if err != nil { - time.Sleep(10 * time.Second) - log.Fatalf("pipeline step failed: %s", err.Error()) + log.Fatalf("[paddle] %s", err.Error()) } } } @@ -111,121 +109,54 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, return err } - stopWatching := make(chan bool) - defer close(stopWatching) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - watcher, err := NewPodWatcher(clientset, pod, stopWatching) + watch, err := Watch(ctx, clientset, pod) if err != nil { return err } - _, err = watchLogs(clientset, pod) - if err != nil { - return fmt.Errorf("Parsing logs failed: %s", err.Error()) - } + containers := make(map[string]bool) for { - event, ok := <-watcher - if !ok { - stopWatching <- true - return fmt.Errorf("pod %s channel has been closed ", pod.Name) - } - switch event.Object.(type) { - case *v1.Pod: - eventPod := event.Object.(*v1.Pod) - switch event.Type { - case watch.Added, watch.Modified: - if eventPod.Status.Phase == v1.PodSucceeded { - watcher = nil - break - } - if eventPod.Status.Phase == v1.PodFailed { - stopWatching <- true - return fmt.Errorf("pod failed: '%s'", eventPod.Status.Message) + e := <-watch + switch e.Type { + case Added: + log.Printf("[paddle] Container %s/%s starting", pod.Name, e.Container) + containers[e.Container] = true + TailLogs(ctx, clientset, e.Pod, e.Container) + case Deleted: + case Removed: + log.Printf("[paddle] Container removed: %s", e.Container) + continue + case Completed: + log.Printf("[paddle] Pod execution completed") + return nil + case Failed: + var msg string + if e.Container != "" { + if e.Message != "" { + msg = fmt.Sprintf("Container %s/%s failed: '%s'", pod.Name, e.Container, e.Message) + } else { + msg = fmt.Sprintf("Container %s/%s failed", pod.Name, e.Container) } - for i := 0; i < len(eventPod.Status.ContainerStatuses); i++ { - containerStatus := eventPod.Status.ContainerStatuses[i] - term := containerStatus.State.Terminated - if term != nil && term.ExitCode != 0 { - return fmt.Errorf("pod container %s exited with error %s", containerStatus.Name, term.Message) - } + _, present := containers[e.Container] + if !present { // container died before being added + TailLogs(ctx, clientset, e.Pod, e.Container) + time.Sleep(3 * time.Second) // give it time to tail logs } - case watch.Deleted: - stopWatching <- true - return fmt.Errorf("pod deleted") - case watch.Error: - stopWatching <- true - return fmt.Errorf("pod error") + } else { + msg = "Pod failed" } - } - if watcher == nil { - break + return errors.New(msg) } } - stopWatching <- true - - err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{}) - if err != nil { - return err - } log.Printf("[paddle] Finishing pod execution") return nil } -func watchLogs(client *kubernetes.Clientset, pod *v1.Pod) (<-chan string, error) { - logCh := make(chan string, 30) - - for i := 0; i < len(pod.Spec.Containers); i++ { - go func(i int) { - container := pod.Spec.Containers[i] - readCloser, err := client.Core().Pods(pod.Namespace).GetLogs(pod.Name, &v1.PodLogOptions{ - Container: container.Name, - Follow: true, - }).Stream() - - for { - - opened := false - if err != nil { - // if errors.IsNotFound(err) || errors.IsInvalid(err) { - log.Printf("notfound %s-%s, {}", pod.Name, container.Name, err) - time.Sleep(1 * time.Second) - // } else { - // log.Printf("Error reading log: %s", err.Error()) - // return - // } - } else { - log.Printf("opened %s", container.Name) - opened = true - } - if opened { - break - } - } - - log.Printf("Ok here") - - defer readCloser.Close() - reader := bufio.NewReader(readCloser) - - for { - log.Printf("Reading line for %s", container.Name) - line, err := reader.ReadBytes('\n') - if err != nil { - if err != io.EOF { - log.Printf("Error reading log line: %s", err.Error()) - } - return - } - str := string(line) - log.Printf("[paddle] [%s] %s", container.Name, str) - } - }(i) - } - return logCh, nil -} - func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error { pods := clientset.CoreV1().Pods(podDefinition.Namespace) deleting := false @@ -233,7 +164,7 @@ func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error var err error err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{}) if err != nil { - if errors.IsNotFound(err) { + if k8errors.IsNotFound(err) { return true, nil } else { return true, err diff --git a/cli/pipeline/watch.go b/cli/pipeline/watch.go index 7873c77..c3edc41 100644 --- a/cli/pipeline/watch.go +++ b/cli/pipeline/watch.go @@ -1,15 +1,35 @@ package pipeline import ( + "bufio" + "context" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "log" ) -func NewPodWatcher(c *kubernetes.Clientset, pod *v1.Pod, stopChannel chan bool) (<-chan watch.Event, error) { - podSelector, err := fields.ParseSelector("metadata.name=" + pod.Name) +type WatchEventType string + +const ( + Added WatchEventType = "ADDED" + Deleted WatchEventType = "DELETED" + Removed WatchEventType = "REMOVED" + Completed WatchEventType = "COMPLETED" + Failed WatchEventType = "FAILED" +) + +type WatchEvent struct { + Type WatchEventType + Pod *v1.Pod + Container string + Message string +} + +func Watch(ctx context.Context, c *kubernetes.Clientset, watchPod *v1.Pod) (<-chan WatchEvent, error) { + podSelector, err := fields.ParseSelector("metadata.name=" + watchPod.Name) if err != nil { return nil, err } @@ -18,31 +38,106 @@ func NewPodWatcher(c *kubernetes.Clientset, pod *v1.Pod, stopChannel chan bool) Watch: true, } - podWatch, err := c.CoreV1().Pods(pod.Namespace).Watch(options) + watcher, err := c.CoreV1().Pods(watchPod.Namespace).Watch(options) + + out := make(chan WatchEvent) - eventCh := make(chan watch.Event, 30) + containers := make(map[string]bool) go func() { - defer podWatch.Stop() - defer close(eventCh) - var podWatchChannelClosed bool for { select { - case _ = <-stopChannel: - return + case e := <-watcher.ResultChan(): + if e.Object == nil { + // Closed because of error + return + } - case podEvent, ok := <-podWatch.ResultChan(): - if !ok { - podWatchChannelClosed = true - } else { - eventCh <- podEvent + pod := e.Object.(*v1.Pod) + + switch e.Type { + case watch.Added, watch.Modified: + if pod.Status.Phase == v1.PodSucceeded { + out <- WatchEvent{Completed, pod, "", ""} + } else if pod.Status.Phase == v1.PodFailed { + out <- WatchEvent{Failed, pod, "", ""} + } else { + for _, container := range pod.Status.ContainerStatuses { + if container.State.Running != nil { + _, present := containers[container.Name] + if !present { + out <- WatchEvent{Added, pod, container.Name, ""} + containers[container.Name] = true + } + } else if container.State.Terminated != nil { + _, present := containers[container.Name] + if present { + out <- WatchEvent{Removed, pod, container.Name, ""} + containers[container.Name] = false + } + if container.State.Terminated.ExitCode != 0 { + out <- WatchEvent{Failed, pod, container.Name, container.State.Terminated.Message} + } + } + } + } + case watch.Deleted: + out <- WatchEvent{Deleted, pod, "", ""} + case watch.Error: + log.Printf("Pod error") } - } - if podWatchChannelClosed { - break + case <-ctx.Done(): + watcher.Stop() + close(out) + return } } }() - return eventCh, nil + return out, nil +} + +func TailLogs(ctx context.Context, c *kubernetes.Clientset, pod *v1.Pod, container string) { + pods := c.Core().Pods(pod.Namespace) + + req := pods.GetLogs(pod.Name, &v1.PodLogOptions{ + Container: container, + Follow: true, + }) + + closed := make(chan struct{}) + + go func() { + + stream, err := req.Stream() + + if err != nil { + log.Fatalf("Error opening log stream for pod %s", pod.Name) + } + + defer stream.Close() + + go func() { + <-closed + stream.Close() + }() + + go func() { + <-ctx.Done() + close(closed) + }() + + reader := bufio.NewReader(stream) + + for { + line, err := reader.ReadBytes('\n') + if err != nil { + return + } + + str := string(line) + + log.Printf("[%s/%s]: %s", pod.Name, container, str) + } + }() }