Skip to content
This repository has been archived by the owner on Dec 9, 2022. It is now read-only.

Commit

Permalink
Run and watch pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
me committed Dec 23, 2017
1 parent 081a0f4 commit 4415e63
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 50 deletions.
164 changes: 145 additions & 19 deletions cli/pipeline/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,29 @@ import (
"fmt"
"github.com/spf13/cobra"
"io/ioutil"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"log"
"time"
)

var stepName string
type runCmdFlagsStruct struct {
StepName string
}

const defaultPollInterval = 5 * time.Second
const defaultTimeout = 120 * time.Second

var runCmdFlags *runCmdFlagsStruct
var clientset *kubernetes.Clientset

var runCmd = &cobra.Command{
Use: "run [pipeline_yaml] [-s step_name]",
Use: "run [pipeline_yaml]",
Short: "Run a pipeline or a pipeline step",
Args: cobra.ExactArgs(1),
Long: `Store data into S3 under a versioned path, and update HEAD.
Expand All @@ -35,41 +49,153 @@ Example:
$ paddle pipeline run test_pipeline.yaml
`,
Run: func(cmd *cobra.Command, args []string) {
runPipeline(args[0])
runPipeline(args[0], runCmdFlags)
},
}

func init() {
runCmd.Flags().StringVarP(&stepName, "step", "s", "", "Single step to execute")
}
runCmdFlags = &runCmdFlagsStruct{}
runCmd.Flags().StringVarP(&runCmdFlags.StepName, "step", "s", "", "Single step to execute")

func runPipeline(path string) {
config, err := getKubernetesConfig()
if err != nil {
panic(err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
clientset, err = kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
}

func runPipeline(path string, flags *runCmdFlagsStruct) {
data, err := ioutil.ReadFile(path)
if err != nil {
panic(err.Error())
}
pipeline := parsePipeline(data)
namespace := pipeline.Namespace
list, err := clientset.CoreV1().Pods(namespace).List(v1.ListOptions{})
// namespace := pipeline.Namespace
// list, err := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{})
// if err != nil {
// panic(err.Error())
// }
// fmt.Printf("{}\n", list)
for _, step := range pipeline.Steps {
if flags.StepName != "" && step.Step != flags.StepName {
continue
}
err = runPipelineStep(pipeline, &step, flags)
if err != nil {
log.Fatalf("pipeline step failed: %s", err.Error())
}
}
// for _, step := range pipeline.Steps {
// stepPod := compilePodTemplate(pipeline, &step)
// decode := scheme.Codecs.UniversalDeserializer().Decode
// obj, groupVersionKind, err := decode([]byte(stepPod), nil, nil)

// if err != nil {
// log.Fatal(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err))
// }
// }
}

func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, flags *runCmdFlagsStruct) error {
log.Printf("[paddle] Running step %s", step.Step)
podDefinition := NewPodDefinition(pipeline, step)
stepPodBuffer := podDefinition.compile()
pod := &v1.Pod{}
yaml.NewYAMLOrJSONDecoder(stepPodBuffer, 4096).Decode(pod)
pods := clientset.CoreV1().Pods(pipeline.Namespace)

err := deleteAndWait(clientset, podDefinition)
if err != nil {
panic(err.Error())
return err
}
fmt.Printf("{}", list)
for _, step := range pipeline.Steps {
stepPod := compilePodTemplate(pipeline, &step)
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, groupVersionKind, err := decode([]byte(stepPod), nil, nil)

if err != nil {
log.Fatal(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err))
pod, err = pods.Create(pod)
if err != nil {
return err
}

stopWatching := make(chan bool)
defer close(stopWatching)

watcher, err := NewPodWatcher(clientset, pod, stopWatching)
if err != nil {
return err
}

for {
event, ok := <-watcher
if !ok {
stopWatching <- true
return fmt.Errorf("pod %s channel has been closed ", pod.Name)
}
switch event.Object.(type) {
case *v1.Pod:
eventPod := event.Object.(*v1.Pod)
switch event.Type {
case watch.Added, watch.Modified:
if eventPod.Status.Phase == v1.PodSucceeded {
watcher = nil
break
log.Printf("Post succeed")

}
if eventPod.Status.Phase == v1.PodFailed {
stopWatching <- true
return fmt.Errorf("pod failed: '%s'", eventPod.Status.Message)
}
for i := 0; i < len(eventPod.Status.ContainerStatuses); i++ {
containerStatus := eventPod.Status.ContainerStatuses[i]
term := containerStatus.State.Terminated
if term != nil && term.ExitCode != 0 {
return fmt.Errorf("pod container %s exited with error %s", containerStatus.Name, term.Message)
}
}
case watch.Deleted:
stopWatching <- true
return fmt.Errorf("pod deleted")
case watch.Error:
stopWatching <- true
return fmt.Errorf("pod error")
}
}
if watcher == nil {
break
}
}

stopWatching <- true

err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{})
if err != nil {
return err
}
log.Printf("[paddle] Finishing pod execution")
return nil
}

func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error {
pods := clientset.CoreV1().Pods(podDefinition.Namespace)
deleting := false
err := wait.PollImmediate(defaultPollInterval, defaultTimeout, func() (bool, error) {
var err error
err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{})
if err != nil {
if errors.IsNotFound(err) {
return true, nil
} else {
return true, err
}
}
if deleting {
log.Print("[paddle] .")
} else {
log.Printf("[paddle] deleting pod %s", podDefinition.PodName)
deleting = true
}
return false, nil
})
return err
}
23 changes: 14 additions & 9 deletions cli/pipeline/template.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package pipeline

import (
"bytes"
"fmt"
"os"
"strings"
"text/template"
)

type PodTemplateData struct {
type PodDefinition struct {
PodName string
StepName string
BranchName string
Expand Down Expand Up @@ -123,27 +123,32 @@ spec:
key: aws-secret-access-key
`

func compilePodTemplate(pipelineDefinition *PipelineDefinition, pipelineDefinitionStep *PipelineDefinitionStep) string {
fmap := template.FuncMap{
"sanitizeName": sanitizeName,
}
func NewPodDefinition(pipelineDefinition *PipelineDefinition, pipelineDefinitionStep *PipelineDefinitionStep) *PodDefinition {
stepName := sanitizeName(pipelineDefinitionStep.Step)
branchName := sanitizeName(pipelineDefinitionStep.Branch)
podName := fmt.Sprintf("%s-%s-%s", sanitizeName(pipelineDefinition.Pipeline), stepName, branchName)
templateData := PodTemplateData{
return &PodDefinition{
PodName: podName,
Namespace: pipelineDefinition.Namespace,
Step: *pipelineDefinitionStep,
Bucket: pipelineDefinition.Bucket,
StepName: stepName,
BranchName: branchName,
}

}

func (p PodDefinition) compile() *bytes.Buffer {
fmap := template.FuncMap{
"sanitizeName": sanitizeName,
}
tmpl := template.Must(template.New("podTemplate").Funcs(fmap).Parse(podTemplate))
err := tmpl.Execute(os.Stdout, templateData)
buffer := new(bytes.Buffer)
err := tmpl.Execute(buffer, p)
if err != nil {
panic(err.Error())
}
return ""
return buffer
}

func sanitizeName(name string) string {
Expand Down
48 changes: 48 additions & 0 deletions cli/pipeline/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package pipeline

import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)

func NewPodWatcher(c *kubernetes.Clientset, pod *v1.Pod, stopChannel chan bool) (<-chan watch.Event, error) {
podSelector, err := fields.ParseSelector("metadata.name=" + pod.Name)
if err != nil {
return nil, err
}
options := metav1.ListOptions{
FieldSelector: podSelector.String(),
Watch: true,
}

podWatch, err := c.CoreV1().Pods(pod.Namespace).Watch(options)

eventCh := make(chan watch.Event, 30)

go func() {
defer podWatch.Stop()
defer close(eventCh)
var podWatchChannelClosed bool
for {
select {
case _ = <-stopChannel:
return

case podEvent, ok := <-podWatch.ResultChan():
if !ok {
podWatchChannelClosed = true
} else {
eventCh <- podEvent
}
}
if podWatchChannelClosed {
break
}
}
}()

return eventCh, nil
}
Loading

0 comments on commit 4415e63

Please sign in to comment.