diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go index 8bc2f59..c36bd2c 100644 --- a/cli/pipeline/run.go +++ b/cli/pipeline/run.go @@ -17,15 +17,29 @@ import ( "fmt" "github.com/spf13/cobra" "io/ioutil" - "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" + "log" + "time" ) -var stepName string +type runCmdFlagsStruct struct { + StepName string +} + +const defaultPollInterval = 5 * time.Second +const defaultTimeout = 120 * time.Second + +var runCmdFlags *runCmdFlagsStruct +var clientset *kubernetes.Clientset var runCmd = &cobra.Command{ - Use: "run [pipeline_yaml] [-s step_name]", + Use: "run [pipeline_yaml]", Short: "Run a pipeline or a pipeline step", Args: cobra.ExactArgs(1), Long: `Store data into S3 under a versioned path, and update HEAD. @@ -35,41 +49,153 @@ Example: $ paddle pipeline run test_pipeline.yaml `, Run: func(cmd *cobra.Command, args []string) { - runPipeline(args[0]) + runPipeline(args[0], runCmdFlags) }, } func init() { - runCmd.Flags().StringVarP(&stepName, "step", "s", "", "Single step to execute") -} + runCmdFlags = &runCmdFlagsStruct{} + runCmd.Flags().StringVarP(&runCmdFlags.StepName, "step", "s", "", "Single step to execute") -func runPipeline(path string) { config, err := getKubernetesConfig() if err != nil { panic(err.Error()) } - clientset, err := kubernetes.NewForConfig(config) + clientset, err = kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } +} + +func runPipeline(path string, flags *runCmdFlagsStruct) { data, err := ioutil.ReadFile(path) if err != nil { panic(err.Error()) } pipeline := parsePipeline(data) - namespace := pipeline.Namespace - list, err := clientset.CoreV1().Pods(namespace).List(v1.ListOptions{}) + // namespace := pipeline.Namespace + // list, err := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{}) + // if err != nil { + // panic(err.Error()) + // } + // fmt.Printf("{}\n", list) + for _, step := range pipeline.Steps { + if flags.StepName != "" && step.Step != flags.StepName { + continue + } + err = runPipelineStep(pipeline, &step, flags) + if err != nil { + log.Fatalf("pipeline step failed: %s", err.Error()) + } + } + // for _, step := range pipeline.Steps { + // stepPod := compilePodTemplate(pipeline, &step) + // decode := scheme.Codecs.UniversalDeserializer().Decode + // obj, groupVersionKind, err := decode([]byte(stepPod), nil, nil) + + // if err != nil { + // log.Fatal(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err)) + // } + // } +} + +func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, flags *runCmdFlagsStruct) error { + log.Printf("[paddle] Running step %s", step.Step) + podDefinition := NewPodDefinition(pipeline, step) + stepPodBuffer := podDefinition.compile() + pod := &v1.Pod{} + yaml.NewYAMLOrJSONDecoder(stepPodBuffer, 4096).Decode(pod) + pods := clientset.CoreV1().Pods(pipeline.Namespace) + + err := deleteAndWait(clientset, podDefinition) if err != nil { - panic(err.Error()) + return err } - fmt.Printf("{}", list) - for _, step := range pipeline.Steps { - stepPod := compilePodTemplate(pipeline, &step) - decode := scheme.Codecs.UniversalDeserializer().Decode - obj, groupVersionKind, err := decode([]byte(stepPod), nil, nil) - if err != nil { - log.Fatal(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err)) + pod, err = pods.Create(pod) + if err != nil { + return err + } + + stopWatching := make(chan bool) + defer close(stopWatching) + + watcher, err := NewPodWatcher(clientset, pod, stopWatching) + if err != nil { + return err + } + + for { + event, ok := <-watcher + if !ok { + stopWatching <- true + return fmt.Errorf("pod %s channel has been closed ", pod.Name) + } + switch event.Object.(type) { + case *v1.Pod: + eventPod := event.Object.(*v1.Pod) + switch event.Type { + case watch.Added, watch.Modified: + if eventPod.Status.Phase == v1.PodSucceeded { + watcher = nil + break + log.Printf("Post succeed") + + } + if eventPod.Status.Phase == v1.PodFailed { + stopWatching <- true + return fmt.Errorf("pod failed: '%s'", eventPod.Status.Message) + } + for i := 0; i < len(eventPod.Status.ContainerStatuses); i++ { + containerStatus := eventPod.Status.ContainerStatuses[i] + term := containerStatus.State.Terminated + if term != nil && term.ExitCode != 0 { + return fmt.Errorf("pod container %s exited with error %s", containerStatus.Name, term.Message) + } + } + case watch.Deleted: + stopWatching <- true + return fmt.Errorf("pod deleted") + case watch.Error: + stopWatching <- true + return fmt.Errorf("pod error") + } + } + if watcher == nil { + break } } + + stopWatching <- true + + err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{}) + if err != nil { + return err + } + log.Printf("[paddle] Finishing pod execution") + return nil +} + +func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error { + pods := clientset.CoreV1().Pods(podDefinition.Namespace) + deleting := false + err := wait.PollImmediate(defaultPollInterval, defaultTimeout, func() (bool, error) { + var err error + err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return true, nil + } else { + return true, err + } + } + if deleting { + log.Print("[paddle] .") + } else { + log.Printf("[paddle] deleting pod %s", podDefinition.PodName) + deleting = true + } + return false, nil + }) + return err } diff --git a/cli/pipeline/template.go b/cli/pipeline/template.go index b669375..d8d9e31 100644 --- a/cli/pipeline/template.go +++ b/cli/pipeline/template.go @@ -1,13 +1,13 @@ package pipeline import ( + "bytes" "fmt" - "os" "strings" "text/template" ) -type PodTemplateData struct { +type PodDefinition struct { PodName string StepName string BranchName string @@ -123,14 +123,11 @@ spec: key: aws-secret-access-key ` -func compilePodTemplate(pipelineDefinition *PipelineDefinition, pipelineDefinitionStep *PipelineDefinitionStep) string { - fmap := template.FuncMap{ - "sanitizeName": sanitizeName, - } +func NewPodDefinition(pipelineDefinition *PipelineDefinition, pipelineDefinitionStep *PipelineDefinitionStep) *PodDefinition { stepName := sanitizeName(pipelineDefinitionStep.Step) branchName := sanitizeName(pipelineDefinitionStep.Branch) podName := fmt.Sprintf("%s-%s-%s", sanitizeName(pipelineDefinition.Pipeline), stepName, branchName) - templateData := PodTemplateData{ + return &PodDefinition{ PodName: podName, Namespace: pipelineDefinition.Namespace, Step: *pipelineDefinitionStep, @@ -138,12 +135,20 @@ func compilePodTemplate(pipelineDefinition *PipelineDefinition, pipelineDefiniti StepName: stepName, BranchName: branchName, } + +} + +func (p PodDefinition) compile() *bytes.Buffer { + fmap := template.FuncMap{ + "sanitizeName": sanitizeName, + } tmpl := template.Must(template.New("podTemplate").Funcs(fmap).Parse(podTemplate)) - err := tmpl.Execute(os.Stdout, templateData) + buffer := new(bytes.Buffer) + err := tmpl.Execute(buffer, p) if err != nil { panic(err.Error()) } - return "" + return buffer } func sanitizeName(name string) string { diff --git a/cli/pipeline/watch.go b/cli/pipeline/watch.go new file mode 100644 index 0000000..7873c77 --- /dev/null +++ b/cli/pipeline/watch.go @@ -0,0 +1,48 @@ +package pipeline + +import ( + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" +) + +func NewPodWatcher(c *kubernetes.Clientset, pod *v1.Pod, stopChannel chan bool) (<-chan watch.Event, error) { + podSelector, err := fields.ParseSelector("metadata.name=" + pod.Name) + if err != nil { + return nil, err + } + options := metav1.ListOptions{ + FieldSelector: podSelector.String(), + Watch: true, + } + + podWatch, err := c.CoreV1().Pods(pod.Namespace).Watch(options) + + eventCh := make(chan watch.Event, 30) + + go func() { + defer podWatch.Stop() + defer close(eventCh) + var podWatchChannelClosed bool + for { + select { + case _ = <-stopChannel: + return + + case podEvent, ok := <-podWatch.ResultChan(): + if !ok { + podWatchChannelClosed = true + } else { + eventCh <- podEvent + } + } + if podWatchChannelClosed { + break + } + } + }() + + return eventCh, nil +} diff --git a/glide.lock b/glide.lock index 845dc0c..5bf8ae6 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,8 @@ -hash: 4a4c528b40066dab94dbbb908b88a4bb99fc963edf6a538ed47d49d778f71d3c -updated: 2017-12-22T16:44:13.9306187Z +hash: 8be82b4770683548c48ea100f6395f68896018f3bc2119d69e22d459aeea91ea +updated: 2017-12-22T22:34:41.613793Z imports: - name: github.com/aws/aws-sdk-go - version: 32d0e45c3f93cd20c25614183246d7e34bc7385c + version: 82ad808f2307df0776c038bfd7ea85440a35c02e subpackages: - aws - aws/awserr @@ -31,6 +31,10 @@ imports: - service/s3/s3iface - service/s3/s3manager - service/sts +- name: github.com/davecgh/go-spew + version: 346938d642f2ec3594ed81d874461961cd0faa76 + subpackages: + - spew - name: github.com/emicklei/go-restful version: ff4f55a206334ef123e4f79bbf348980da81ca46 subpackages: @@ -78,6 +82,10 @@ imports: version: 787624de3eb7bd915c329cba748687a3b22666a6 subpackages: - diskcache +- name: github.com/hashicorp/golang-lru + version: a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4 + subpackages: + - simplelru - name: github.com/hashicorp/hcl version: 23c074d0eceb2b8a5bfdbb271ab780cde70f05a8 subpackages: @@ -143,7 +151,6 @@ imports: version: 1c05540f6879653db88113bc4a2b70aec4bd491f subpackages: - context - - context/ctxhttp - http2 - http2/hpack - idna @@ -206,17 +213,9 @@ imports: - name: k8s.io/apimachinery version: 180eddb345a5be3a157cea1c624700ad5bd27b8f subpackages: - - pkg/api/equality - pkg/api/errors - pkg/api/meta - pkg/api/resource - - pkg/api/testing - - pkg/api/testing/fuzzer - - pkg/api/testing/roundtrip - - pkg/apimachinery - - pkg/apimachinery/announced - - pkg/apimachinery/registered - - pkg/apis/meta/fuzzer - pkg/apis/meta/internalversion - pkg/apis/meta/v1 - pkg/apis/meta/v1/unstructured @@ -240,24 +239,17 @@ imports: - pkg/util/diff - pkg/util/errors - pkg/util/framer - - pkg/util/httpstream - - pkg/util/httpstream/spdy - pkg/util/intstr - pkg/util/json - - pkg/util/mergepatch - pkg/util/net - - pkg/util/remotecommand - pkg/util/runtime - pkg/util/sets - - pkg/util/strategicpatch - pkg/util/validation - pkg/util/validation/field - pkg/util/wait - pkg/util/yaml - pkg/version - pkg/watch - - third_party/forked/golang/json - - third_party/forked/golang/netutil - third_party/forked/golang/reflect - name: k8s.io/client-go version: 78700dec6369ba22221b72770783300f143df150 @@ -297,13 +289,16 @@ imports: - rest - rest/watch - tools/auth + - tools/cache - tools/clientcmd - tools/clientcmd/api - tools/clientcmd/api/latest - tools/clientcmd/api/v1 - tools/metrics + - tools/pager - tools/reference - transport + - util/buffer - util/cert - util/flowcontrol - util/homedir @@ -312,5 +307,4 @@ imports: version: 39a7bf85c140f972372c2a0d1ee40adbf0c8bfe1 subpackages: - pkg/common - - pkg/util/proto testImports: [] diff --git a/glide.yaml b/glide.yaml index 7cf3178..2ec370e 100644 --- a/glide.yaml +++ b/glide.yaml @@ -16,6 +16,10 @@ import: version: ^1.0.0 - package: k8s.io/client-go version: v6.0.0 -- package: github.com/gregjones/httpcache # k8s dependency -- package: github.com/imdario/mergo # k8s dependency +- package: github.com/gregjones/httpcache +- package: github.com/imdario/mergo - package: gopkg.in/yaml.v2 +- package: github.com/davecgh/go-spew + version: ^1.1.0 + subpackages: + - spew