diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go index c36bd2c..27b10f4 100644 --- a/cli/pipeline/run.go +++ b/cli/pipeline/run.go @@ -14,8 +14,10 @@ package pipeline import ( + "bufio" "fmt" "github.com/spf13/cobra" + "io" "io/ioutil" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -29,7 +31,8 @@ import ( ) type runCmdFlagsStruct struct { - StepName string + StepName string + BucketName string } const defaultPollInterval = 5 * time.Second @@ -56,6 +59,7 @@ $ paddle pipeline run test_pipeline.yaml func init() { runCmdFlags = &runCmdFlagsStruct{} runCmd.Flags().StringVarP(&runCmdFlags.StepName, "step", "s", "", "Single step to execute") + runCmd.Flags().StringVarP(&runCmdFlags.BucketName, "bucket", "b", "", "Bucket name") config, err := getKubernetesConfig() if err != nil { @@ -73,30 +77,20 @@ func runPipeline(path string, flags *runCmdFlagsStruct) { panic(err.Error()) } pipeline := parsePipeline(data) - // namespace := pipeline.Namespace - // list, err := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{}) - // if err != nil { - // panic(err.Error()) - // } - // fmt.Printf("{}\n", list) + if flags.BucketName != "" { + pipeline.Bucket = flags.BucketName + } + for _, step := range pipeline.Steps { if flags.StepName != "" && step.Step != flags.StepName { continue } err = runPipelineStep(pipeline, &step, flags) if err != nil { + time.Sleep(10 * time.Second) log.Fatalf("pipeline step failed: %s", err.Error()) } } - // for _, step := range pipeline.Steps { - // stepPod := compilePodTemplate(pipeline, &step) - // decode := scheme.Codecs.UniversalDeserializer().Decode - // obj, groupVersionKind, err := decode([]byte(stepPod), nil, nil) - - // if err != nil { - // log.Fatal(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err)) - // } - // } } func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, flags *runCmdFlagsStruct) error { @@ -125,6 +119,11 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, return err } + _, err = watchLogs(clientset, pod) + if err != nil { + return fmt.Errorf("Parsing logs failed: %s", err.Error()) + } + for { event, ok := <-watcher if !ok { @@ -139,8 +138,6 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, if eventPod.Status.Phase == v1.PodSucceeded { watcher = nil break - log.Printf("Post succeed") - } if eventPod.Status.Phase == v1.PodFailed { stopWatching <- true @@ -176,6 +173,59 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, return nil } +func watchLogs(client *kubernetes.Clientset, pod *v1.Pod) (<-chan string, error) { + logCh := make(chan string, 30) + + for i := 0; i < len(pod.Spec.Containers); i++ { + go func(i int) { + container := pod.Spec.Containers[i] + readCloser, err := client.Core().Pods(pod.Namespace).GetLogs(pod.Name, &v1.PodLogOptions{ + Container: container.Name, + Follow: true, + }).Stream() + + for { + + opened := false + if err != nil { + // if errors.IsNotFound(err) || errors.IsInvalid(err) { + log.Printf("notfound %s-%s, {}", pod.Name, container.Name, err) + time.Sleep(1 * time.Second) + // } else { + // log.Printf("Error reading log: %s", err.Error()) + // return + // } + } else { + log.Printf("opened %s", container.Name) + opened = true + } + if opened { + break + } + } + + log.Printf("Ok here") + + defer readCloser.Close() + reader := bufio.NewReader(readCloser) + + for { + log.Printf("Reading line for %s", container.Name) + line, err := reader.ReadBytes('\n') + if err != nil { + if err != io.EOF { + log.Printf("Error reading log line: %s", err.Error()) + } + return + } + str := string(line) + log.Printf("[paddle] [%s] %s", container.Name, str) + } + }(i) + } + return logCh, nil +} + func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error { pods := clientset.CoreV1().Pods(podDefinition.Namespace) deleting := false diff --git a/cli/pipeline/template.go b/cli/pipeline/template.go index d8d9e31..2849e48 100644 --- a/cli/pipeline/template.go +++ b/cli/pipeline/template.go @@ -24,7 +24,7 @@ metadata: name: "{{ .PodName }}" namespace: {{ .Namespace }} labels: - pipeline: canoe + canoe: pipeline spec: restartPolicy: Never volumes: