Skip to content
This repository has been archived by the owner on Dec 9, 2022. It is now read-only.

Commit

Permalink
Initial watch logs implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
me committed Jan 1, 2018
1 parent 4415e63 commit 8a48977
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 19 deletions.
86 changes: 68 additions & 18 deletions cli/pipeline/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package pipeline

import (
"bufio"
"fmt"
"github.com/spf13/cobra"
"io"
"io/ioutil"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,7 +31,8 @@ import (
)

type runCmdFlagsStruct struct {
StepName string
StepName string
BucketName string
}

const defaultPollInterval = 5 * time.Second
Expand All @@ -56,6 +59,7 @@ $ paddle pipeline run test_pipeline.yaml
func init() {
runCmdFlags = &runCmdFlagsStruct{}
runCmd.Flags().StringVarP(&runCmdFlags.StepName, "step", "s", "", "Single step to execute")
runCmd.Flags().StringVarP(&runCmdFlags.BucketName, "bucket", "b", "", "Bucket name")

config, err := getKubernetesConfig()
if err != nil {
Expand All @@ -73,30 +77,20 @@ func runPipeline(path string, flags *runCmdFlagsStruct) {
panic(err.Error())
}
pipeline := parsePipeline(data)
// namespace := pipeline.Namespace
// list, err := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{})
// if err != nil {
// panic(err.Error())
// }
// fmt.Printf("{}\n", list)
if flags.BucketName != "" {
pipeline.Bucket = flags.BucketName
}

for _, step := range pipeline.Steps {
if flags.StepName != "" && step.Step != flags.StepName {
continue
}
err = runPipelineStep(pipeline, &step, flags)
if err != nil {
time.Sleep(10 * time.Second)
log.Fatalf("pipeline step failed: %s", err.Error())
}
}
// for _, step := range pipeline.Steps {
// stepPod := compilePodTemplate(pipeline, &step)
// decode := scheme.Codecs.UniversalDeserializer().Decode
// obj, groupVersionKind, err := decode([]byte(stepPod), nil, nil)

// if err != nil {
// log.Fatal(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err))
// }
// }
}

func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, flags *runCmdFlagsStruct) error {
Expand Down Expand Up @@ -125,6 +119,11 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep,
return err
}

_, err = watchLogs(clientset, pod)
if err != nil {
return fmt.Errorf("Parsing logs failed: %s", err.Error())
}

for {
event, ok := <-watcher
if !ok {
Expand All @@ -139,8 +138,6 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep,
if eventPod.Status.Phase == v1.PodSucceeded {
watcher = nil
break
log.Printf("Post succeed")

}
if eventPod.Status.Phase == v1.PodFailed {
stopWatching <- true
Expand Down Expand Up @@ -176,6 +173,59 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep,
return nil
}

func watchLogs(client *kubernetes.Clientset, pod *v1.Pod) (<-chan string, error) {
logCh := make(chan string, 30)

for i := 0; i < len(pod.Spec.Containers); i++ {
go func(i int) {
container := pod.Spec.Containers[i]
readCloser, err := client.Core().Pods(pod.Namespace).GetLogs(pod.Name, &v1.PodLogOptions{
Container: container.Name,
Follow: true,
}).Stream()

for {

opened := false
if err != nil {
// if errors.IsNotFound(err) || errors.IsInvalid(err) {
log.Printf("notfound %s-%s, {}", pod.Name, container.Name, err)
time.Sleep(1 * time.Second)
// } else {
// log.Printf("Error reading log: %s", err.Error())
// return
// }
} else {
log.Printf("opened %s", container.Name)
opened = true
}
if opened {
break
}
}

log.Printf("Ok here")

defer readCloser.Close()
reader := bufio.NewReader(readCloser)

for {
log.Printf("Reading line for %s", container.Name)
line, err := reader.ReadBytes('\n')
if err != nil {
if err != io.EOF {
log.Printf("Error reading log line: %s", err.Error())
}
return
}
str := string(line)
log.Printf("[paddle] [%s] %s", container.Name, str)
}
}(i)
}
return logCh, nil
}

func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error {
pods := clientset.CoreV1().Pods(podDefinition.Namespace)
deleting := false
Expand Down
2 changes: 1 addition & 1 deletion cli/pipeline/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ metadata:
name: "{{ .PodName }}"
namespace: {{ .Namespace }}
labels:
pipeline: canoe
canoe: pipeline
spec:
restartPolicy: Never
volumes:
Expand Down

0 comments on commit 8a48977

Please sign in to comment.