Skip to content
This repository has been archived by the owner on Dec 9, 2022. It is now read-only.

Commit

Permalink
Move monitor and log tailing to watch
Browse files Browse the repository at this point in the history
  • Loading branch information
me committed Jan 1, 2018
1 parent 8a48977 commit 55dc093
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 123 deletions.
141 changes: 36 additions & 105 deletions cli/pipeline/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
package pipeline

import (
"bufio"
"context"
"errors"
"fmt"
"github.com/spf13/cobra"
"io"
"io/ioutil"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"log"
"time"
Expand Down Expand Up @@ -87,8 +86,7 @@ func runPipeline(path string, flags *runCmdFlagsStruct) {
}
err = runPipelineStep(pipeline, &step, flags)
if err != nil {
time.Sleep(10 * time.Second)
log.Fatalf("pipeline step failed: %s", err.Error())
log.Fatalf("[paddle] %s", err.Error())
}
}
}
Expand All @@ -111,129 +109,62 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep,
return err
}

stopWatching := make(chan bool)
defer close(stopWatching)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

watcher, err := NewPodWatcher(clientset, pod, stopWatching)
watch, err := Watch(ctx, clientset, pod)
if err != nil {
return err
}

_, err = watchLogs(clientset, pod)
if err != nil {
return fmt.Errorf("Parsing logs failed: %s", err.Error())
}
containers := make(map[string]bool)

for {
event, ok := <-watcher
if !ok {
stopWatching <- true
return fmt.Errorf("pod %s channel has been closed ", pod.Name)
}
switch event.Object.(type) {
case *v1.Pod:
eventPod := event.Object.(*v1.Pod)
switch event.Type {
case watch.Added, watch.Modified:
if eventPod.Status.Phase == v1.PodSucceeded {
watcher = nil
break
}
if eventPod.Status.Phase == v1.PodFailed {
stopWatching <- true
return fmt.Errorf("pod failed: '%s'", eventPod.Status.Message)
e := <-watch
switch e.Type {
case Added:
log.Printf("[paddle] Container %s/%s starting", pod.Name, e.Container)
containers[e.Container] = true
TailLogs(ctx, clientset, e.Pod, e.Container)
case Deleted:
case Removed:
log.Printf("[paddle] Container removed: %s", e.Container)
continue
case Completed:
log.Printf("[paddle] Pod execution completed")
return nil
case Failed:
var msg string
if e.Container != "" {
if e.Message != "" {
msg = fmt.Sprintf("Container %s/%s failed: '%s'", pod.Name, e.Container, e.Message)
} else {
msg = fmt.Sprintf("Container %s/%s failed", pod.Name, e.Container)
}
for i := 0; i < len(eventPod.Status.ContainerStatuses); i++ {
containerStatus := eventPod.Status.ContainerStatuses[i]
term := containerStatus.State.Terminated
if term != nil && term.ExitCode != 0 {
return fmt.Errorf("pod container %s exited with error %s", containerStatus.Name, term.Message)
}
_, present := containers[e.Container]
if !present { // container died before being added
TailLogs(ctx, clientset, e.Pod, e.Container)
time.Sleep(3 * time.Second) // give it time to tail logs
}
case watch.Deleted:
stopWatching <- true
return fmt.Errorf("pod deleted")
case watch.Error:
stopWatching <- true
return fmt.Errorf("pod error")
} else {
msg = "Pod failed"
}
}
if watcher == nil {
break
return errors.New(msg)
}
}

stopWatching <- true

err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{})
if err != nil {
return err
}
log.Printf("[paddle] Finishing pod execution")
return nil
}

func watchLogs(client *kubernetes.Clientset, pod *v1.Pod) (<-chan string, error) {
logCh := make(chan string, 30)

for i := 0; i < len(pod.Spec.Containers); i++ {
go func(i int) {
container := pod.Spec.Containers[i]
readCloser, err := client.Core().Pods(pod.Namespace).GetLogs(pod.Name, &v1.PodLogOptions{
Container: container.Name,
Follow: true,
}).Stream()

for {

opened := false
if err != nil {
// if errors.IsNotFound(err) || errors.IsInvalid(err) {
log.Printf("notfound %s-%s, {}", pod.Name, container.Name, err)
time.Sleep(1 * time.Second)
// } else {
// log.Printf("Error reading log: %s", err.Error())
// return
// }
} else {
log.Printf("opened %s", container.Name)
opened = true
}
if opened {
break
}
}

log.Printf("Ok here")

defer readCloser.Close()
reader := bufio.NewReader(readCloser)

for {
log.Printf("Reading line for %s", container.Name)
line, err := reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
log.Printf("Error reading log line: %s", err.Error())
}
return
}
str := string(line)
log.Printf("[paddle] [%s] %s", container.Name, str)
}
}(i)
}
return logCh, nil
}

func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error {
pods := clientset.CoreV1().Pods(podDefinition.Namespace)
deleting := false
err := wait.PollImmediate(defaultPollInterval, defaultTimeout, func() (bool, error) {
var err error
err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{})
if err != nil {
if errors.IsNotFound(err) {
if k8errors.IsNotFound(err) {
return true, nil
} else {
return true, err
Expand Down
131 changes: 113 additions & 18 deletions cli/pipeline/watch.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
package pipeline

import (
"bufio"
"context"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"log"
)

func NewPodWatcher(c *kubernetes.Clientset, pod *v1.Pod, stopChannel chan bool) (<-chan watch.Event, error) {
podSelector, err := fields.ParseSelector("metadata.name=" + pod.Name)
type WatchEventType string

const (
Added WatchEventType = "ADDED"
Deleted WatchEventType = "DELETED"
Removed WatchEventType = "REMOVED"
Completed WatchEventType = "COMPLETED"
Failed WatchEventType = "FAILED"
)

type WatchEvent struct {
Type WatchEventType
Pod *v1.Pod
Container string
Message string
}

func Watch(ctx context.Context, c *kubernetes.Clientset, watchPod *v1.Pod) (<-chan WatchEvent, error) {
podSelector, err := fields.ParseSelector("metadata.name=" + watchPod.Name)
if err != nil {
return nil, err
}
Expand All @@ -18,31 +38,106 @@ func NewPodWatcher(c *kubernetes.Clientset, pod *v1.Pod, stopChannel chan bool)
Watch: true,
}

podWatch, err := c.CoreV1().Pods(pod.Namespace).Watch(options)
watcher, err := c.CoreV1().Pods(watchPod.Namespace).Watch(options)

out := make(chan WatchEvent)

eventCh := make(chan watch.Event, 30)
containers := make(map[string]bool)

go func() {
defer podWatch.Stop()
defer close(eventCh)
var podWatchChannelClosed bool
for {
select {
case _ = <-stopChannel:
return
case e := <-watcher.ResultChan():
if e.Object == nil {
// Closed because of error
return
}

case podEvent, ok := <-podWatch.ResultChan():
if !ok {
podWatchChannelClosed = true
} else {
eventCh <- podEvent
pod := e.Object.(*v1.Pod)

switch e.Type {
case watch.Added, watch.Modified:
if pod.Status.Phase == v1.PodSucceeded {
out <- WatchEvent{Completed, pod, "", ""}
} else if pod.Status.Phase == v1.PodFailed {
out <- WatchEvent{Failed, pod, "", ""}
} else {
for _, container := range pod.Status.ContainerStatuses {
if container.State.Running != nil {
_, present := containers[container.Name]
if !present {
out <- WatchEvent{Added, pod, container.Name, ""}
containers[container.Name] = true
}
} else if container.State.Terminated != nil {
_, present := containers[container.Name]
if present {
out <- WatchEvent{Removed, pod, container.Name, ""}
containers[container.Name] = false
}
if container.State.Terminated.ExitCode != 0 {
out <- WatchEvent{Failed, pod, container.Name, container.State.Terminated.Message}
}
}
}
}
case watch.Deleted:
out <- WatchEvent{Deleted, pod, "", ""}
case watch.Error:
log.Printf("Pod error")
}
}
if podWatchChannelClosed {
break
case <-ctx.Done():
watcher.Stop()
close(out)
return
}
}
}()

return eventCh, nil
return out, nil
}

func TailLogs(ctx context.Context, c *kubernetes.Clientset, pod *v1.Pod, container string) {
pods := c.Core().Pods(pod.Namespace)

req := pods.GetLogs(pod.Name, &v1.PodLogOptions{
Container: container,
Follow: true,
})

closed := make(chan struct{})

go func() {

stream, err := req.Stream()

if err != nil {
log.Fatalf("Error opening log stream for pod %s", pod.Name)
}

defer stream.Close()

go func() {
<-closed
stream.Close()
}()

go func() {
<-ctx.Done()
close(closed)
}()

reader := bufio.NewReader(stream)

for {
line, err := reader.ReadBytes('\n')
if err != nil {
return
}

str := string(line)

log.Printf("[%s/%s]: %s", pod.Name, container, str)
}
}()
}

0 comments on commit 55dc093

Please sign in to comment.