From deab6e819c12ba07b9495c28fb0ad9dccf020c8a Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Fri, 22 Dec 2017 18:52:58 +0000 Subject: [PATCH 01/17] WIP --- .gitignore | 1 + README.md | 7 + cli/pipeline/cmd.go | 28 +++ cli/pipeline/kubernetes.go | 33 +++ cli/pipeline/pipeline_definition.go | 41 ++++ cli/pipeline/run.go | 68 ++++++ cli/pipeline/template.go | 154 ++++++++++++++ cli/root.go | 3 +- glide.lock | 316 ++++++++++++++++++++++++++++ glide.yaml | 21 ++ 10 files changed, 671 insertions(+), 1 deletion(-) create mode 100644 cli/pipeline/cmd.go create mode 100644 cli/pipeline/kubernetes.go create mode 100644 cli/pipeline/pipeline_definition.go create mode 100644 cli/pipeline/run.go create mode 100644 cli/pipeline/template.go create mode 100644 glide.lock create mode 100644 glide.yaml diff --git a/.gitignore b/.gitignore index bfe1862..69fab0b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ tmp dist paddle +vendor diff --git a/README.md b/README.md index 3a74b61..12e1798 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,13 @@ git clone git@github.com:deliveroo/paddle.git cd paddle ``` +Install dependencies: + +``` +brew install glide +glide up -v +``` + You will need create a `$HOME/.paddle.yaml` that contains the bucket name, e.g: ``` diff --git a/cli/pipeline/cmd.go b/cli/pipeline/cmd.go new file mode 100644 index 0000000..6cee888 --- /dev/null +++ b/cli/pipeline/cmd.go @@ -0,0 +1,28 @@ +// Copyright © 2017 RooFoods LTD +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "github.com/spf13/cobra" +) + +var PipelineCmd = &cobra.Command{ + Use: "pipeline", + Short: "Manage Canoe pipelines", + Long: "Run and control Canoe pipelines", +} + +func init() { + PipelineCmd.AddCommand(runCmd) +} diff --git a/cli/pipeline/kubernetes.go b/cli/pipeline/kubernetes.go new file mode 100644 index 0000000..42c9522 --- /dev/null +++ b/cli/pipeline/kubernetes.go @@ -0,0 +1,33 @@ +package pipeline + +import ( + "flag" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "os" + "path/filepath" +) + +func getKubernetesConfig() (*rest.Config, error) { + var config *rest.Config + var kubeconfig *string + if home := homeDir(); home != "" { + kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") + } else { + kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") + } + flag.Parse() + + config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) + if err != nil { + config, err = rest.InClusterConfig() + } + return config, err +} + +func homeDir() string { + if h := os.Getenv("HOME"); h != "" { + return h + } + return os.Getenv("USERPROFILE") // windows +} diff --git a/cli/pipeline/pipeline_definition.go b/cli/pipeline/pipeline_definition.go new file mode 100644 index 0000000..b5a0b18 --- /dev/null +++ b/cli/pipeline/pipeline_definition.go @@ -0,0 +1,41 @@ +package pipeline + +import ( + "gopkg.in/yaml.v2" + "log" +) + +type PipelineDefinitionStep struct { + Step string `yaml:"step"` + Version string `yaml:"version"` + Branch string `yaml:"branch"` + Image string `yaml:"image"` + Inputs []struct { + Step string `yaml:"step"` + Version string `yaml:"version"` + Branch string `yaml:"branch"` + Path string `yaml:"path"` + } `yaml:"inputs"` + Commands []string `yaml:"commands"` + Resources struct { + CPU int `yaml:"cpu"` + Memory string `yaml:"memory"` + } `yaml:"resources"` +} + +type PipelineDefinition struct { + Pipeline string `yaml:"pipeline"` + Bucket string `yaml:"bucket"` + Namespace string `yaml:"namespace"` + Steps []PipelineDefinitionStep `yaml:"steps"` +} + +func parsePipeline(data []byte) *PipelineDefinition { + pipeline := PipelineDefinition{} + + err := yaml.Unmarshal(data, &pipeline) + if err != nil { + log.Fatalf("error: %v", err) + } + return &pipeline +} diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go new file mode 100644 index 0000000..7c53869 --- /dev/null +++ b/cli/pipeline/run.go @@ -0,0 +1,68 @@ +// Copyright © 2017 RooFoods LTD +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "fmt" + "github.com/spf13/cobra" + "io/ioutil" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +var stepName string + +var runCmd = &cobra.Command{ + Use: "run [pipeline_yaml] [-s step_name]", + Short: "Run a pipeline or a pipeline step", + Args: cobra.ExactArgs(1), + Long: `Store data into S3 under a versioned path, and update HEAD. + +Example: + +$ paddle pipeline run test_pipeline.yaml +`, + Run: func(cmd *cobra.Command, args []string) { + runPipeline(args[0]) + }, +} + +func init() { + runCmd.Flags().StringVarP(&stepName, "step", "s", "", "Single step to execute") +} + +func runPipeline(path string) { + config, err := getKubernetesConfig() + if err != nil { + panic(err.Error()) + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + panic(err.Error()) + } + data, err := ioutil.ReadFile(path) + if err != nil { + panic(err.Error()) + } + pipeline := parsePipeline(data) + namespace := pipeline.Namespace + list, err := clientset.CoreV1().Pods(namespace).List(v1.ListOptions{}) + if err != nil { + panic(err.Error()) + } + fmt.Printf("{}", list) + for _, step := range pipeline.Steps { + compilePodTemplate(pipeline, &step) + } +} diff --git a/cli/pipeline/template.go b/cli/pipeline/template.go new file mode 100644 index 0000000..b669375 --- /dev/null +++ b/cli/pipeline/template.go @@ -0,0 +1,154 @@ +package pipeline + +import ( + "fmt" + "os" + "strings" + "text/template" +) + +type PodTemplateData struct { + PodName string + StepName string + BranchName string + Namespace string + Bucket string + + Step PipelineDefinitionStep +} + +const podTemplate = ` +apiVersion: v1 +kind: Pod +metadata: + name: "{{ .PodName }}" + namespace: {{ .Namespace }} + labels: + pipeline: canoe +spec: + restartPolicy: Never + volumes: + - + name: shared-data + emptyDir: + medium: '' + containers: + - + name: main + image: "{{ .Step.Image }}" + volumeMounts: + - + name: shared-data + mountPath: /data + resources: + limits: + cpu: "{{ .Step.Resources.CPU }}" + memory: "{{ .Step.Resources.Memory }}" + command: + - "/bin/sh" + - "-c" + - "while true; do if [ -e /data/first-step.txt ]; then (( + {{ range $index, $command := .Step.Commands }} + ({{ $command }}) && + {{ end }} + touch /data/main-passed.txt) || (touch /data/main-failed.txt && exit 1)) && touch /data/main.txt; break; fi; done" + env: + - + name: INPUT_PATH + value: /data/input + - + name: OUTPUT_PATH + value: /data/output + - + name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: aws-credentials-training + key: aws-access-key-id + - + name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: aws-credentials-training + key: aws-secret-access-key + - + name: paddle + image: "219541440308.dkr.ecr.eu-west-1.amazonaws.com/paddlecontainer:latest" + volumeMounts: + - + name: shared-data + mountPath: /data + command: + - "/bin/sh" + - "-c" + - "mkdir -p $INPUT_PATH $OUTPUT_PATH && + {{ range $index, $input := .Step.Inputs }} + paddle data get {{ $input.Step }}/{{ $input.Version }} $INPUT_PATH -b {{ $input.Branch | sanitizeName }} -p {{ $input.Path }} && + {{ end }} + touch /data/first-step.txt && + echo first step finished && + (while true; do + if [ -e /data/main-failed.txt ]; then + exit 1; + fi; + if [ -e /data/main-passed.txt ]; then + paddle data commit $OUTPUT_PATH {{ .StepName }}/{{ .Step.Version }} -b {{ .BranchName }}; + exit 0; + fi; + done)" + env: + - + name: BUCKET + value: "{{ .Bucket }}" + - + name: AWS_REGION + value: eu-west-1 + - + name: INPUT_PATH + value: /data/input + - + name: OUTPUT_PATH + value: /data/output + - + name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: aws-credentials + key: aws-access-key-id + - + name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: aws-credentials + key: aws-secret-access-key +` + +func compilePodTemplate(pipelineDefinition *PipelineDefinition, pipelineDefinitionStep *PipelineDefinitionStep) string { + fmap := template.FuncMap{ + "sanitizeName": sanitizeName, + } + stepName := sanitizeName(pipelineDefinitionStep.Step) + branchName := sanitizeName(pipelineDefinitionStep.Branch) + podName := fmt.Sprintf("%s-%s-%s", sanitizeName(pipelineDefinition.Pipeline), stepName, branchName) + templateData := PodTemplateData{ + PodName: podName, + Namespace: pipelineDefinition.Namespace, + Step: *pipelineDefinitionStep, + Bucket: pipelineDefinition.Bucket, + StepName: stepName, + BranchName: branchName, + } + tmpl := template.Must(template.New("podTemplate").Funcs(fmap).Parse(podTemplate)) + err := tmpl.Execute(os.Stdout, templateData) + if err != nil { + panic(err.Error()) + } + return "" +} + +func sanitizeName(name string) string { + str := strings.ToLower(name) + str = strings.Replace(str, "_", "-", -1) + str = strings.Replace(str, "/", "-", -1) + return str +} diff --git a/cli/root.go b/cli/root.go index ae2c4c7..8be6033 100644 --- a/cli/root.go +++ b/cli/root.go @@ -18,6 +18,7 @@ import ( "os" "github.com/deliveroo/paddle/cli/data" + "github.com/deliveroo/paddle/cli/pipeline" homedir "github.com/mitchellh/go-homedir" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -54,7 +55,7 @@ func init() { // RootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") RootCmd.AddCommand(data.DataCmd) - + RootCmd.AddCommand(pipeline.PipelineCmd) } // initConfig reads in config file and ENV variables if set. diff --git a/glide.lock b/glide.lock new file mode 100644 index 0000000..845dc0c --- /dev/null +++ b/glide.lock @@ -0,0 +1,316 @@ +hash: 4a4c528b40066dab94dbbb908b88a4bb99fc963edf6a538ed47d49d778f71d3c +updated: 2017-12-22T16:44:13.9306187Z +imports: +- name: github.com/aws/aws-sdk-go + version: 32d0e45c3f93cd20c25614183246d7e34bc7385c + subpackages: + - aws + - aws/awserr + - aws/awsutil + - aws/client + - aws/client/metadata + - aws/corehandlers + - aws/credentials + - aws/credentials/ec2rolecreds + - aws/credentials/endpointcreds + - aws/credentials/stscreds + - aws/defaults + - aws/ec2metadata + - aws/endpoints + - aws/request + - aws/session + - aws/signer/v4 + - internal/shareddefaults + - private/protocol + - private/protocol/query + - private/protocol/query/queryutil + - private/protocol/rest + - private/protocol/restxml + - private/protocol/xml/xmlutil + - service/s3 + - service/s3/s3iface + - service/s3/s3manager + - service/sts +- name: github.com/emicklei/go-restful + version: ff4f55a206334ef123e4f79bbf348980da81ca46 + subpackages: + - log +- name: github.com/fsnotify/fsnotify + version: 4da3e2cfbabc9f751898f250b49f2439785783a1 +- name: github.com/ghodss/yaml + version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee +- name: github.com/go-ini/ini + version: 32e4c1e6bc4e7d0d8451aa6b75200d19e37a536a +- name: github.com/go-openapi/jsonpointer + version: 46af16f9f7b149af66e5d1bd010e3574dc06de98 +- name: github.com/go-openapi/jsonreference + version: 13c6e3589ad90f49bd3e3bbe2c2cb3d7a4142272 +- name: github.com/go-openapi/spec + version: 7abd5745472fff5eb3685386d5fb8bf38683154d +- name: github.com/go-openapi/swag + version: f3f9494671f93fcff853e3c6e9e948b3eb71e590 +- name: github.com/gogo/protobuf + version: c0656edd0d9eab7c66d1eb0c568f9039345796f7 + subpackages: + - proto + - sortkeys +- name: github.com/golang/glog + version: 44145f04b68cf362d9c4df2182967c2275eaefed +- name: github.com/golang/protobuf + version: 1643683e1b54a9e88ad26d98f81400c8c9d9f4f9 + subpackages: + - proto + - ptypes + - ptypes/any + - ptypes/duration + - ptypes/timestamp +- name: github.com/google/btree + version: 7d79101e329e5a3adf994758c578dab82b90c017 +- name: github.com/google/gofuzz + version: 44d81051d367757e1c7c6a5a86423ece9afcf63c +- name: github.com/googleapis/gnostic + version: 0c5108395e2debce0d731cf0287ddf7242066aba + subpackages: + - OpenAPIv2 + - compiler + - extensions +- name: github.com/gregjones/httpcache + version: 787624de3eb7bd915c329cba748687a3b22666a6 + subpackages: + - diskcache +- name: github.com/hashicorp/hcl + version: 23c074d0eceb2b8a5bfdbb271ab780cde70f05a8 + subpackages: + - hcl/ast + - hcl/parser + - hcl/scanner + - hcl/strconv + - hcl/token + - json/parser + - json/scanner + - json/token +- name: github.com/howeyc/gopass + version: bf9dde6d0d2c004a008c27aaee91170c786f6db8 +- name: github.com/imdario/mergo + version: 6633656539c1639d9d78127b7d47c622b5d7b6dc +- name: github.com/inconshreveable/mousetrap + version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 +- name: github.com/jmespath/go-jmespath + version: dd801d4f4ce7ac746e7e7b4489d2fa600b3b096b +- name: github.com/json-iterator/go + version: 36b14963da70d11297d313183d7e6388c8510e1e +- name: github.com/juju/ratelimit + version: 5b9ff866471762aa2ab2dced63c9fb6f53921342 +- name: github.com/magiconair/properties + version: 49d762b9817ba1c2e9d0c69183c2b4a8b8f1d934 +- name: github.com/mailru/easyjson + version: 2f5df55504ebc322e4d52d34df6a1f5b503bf26d + subpackages: + - buffer + - jlexer + - jwriter +- name: github.com/mitchellh/go-homedir + version: b8bc1bf767474819792c23f32d8286a45736f1c6 +- name: github.com/mitchellh/mapstructure + version: 06020f85339e21b2478f756a78e295255ffa4d6a +- name: github.com/pelletier/go-toml + version: 0131db6d737cfbbfb678f8b7d92e55e27ce46224 +- name: github.com/peterbourgon/diskv + version: 5f041e8faa004a95c88a202771f4cc3e991971e6 +- name: github.com/PuerkitoBio/purell + version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 +- name: github.com/PuerkitoBio/urlesc + version: 5bd2802263f21d8788851d5305584c82a5c75d7e +- name: github.com/spf13/afero + version: 8d919cbe7e2627e417f3e45c3c0e489a5b7e2536 + subpackages: + - mem +- name: github.com/spf13/cast + version: acbeb36b902d72a7a4c18e8f3241075e7ab763e4 +- name: github.com/spf13/cobra + version: 7b2c5ac9fc04fc5efafb60700713d4fa609b777b +- name: github.com/spf13/jwalterweatherman + version: 12bd96e66386c1960ab0f74ced1362f66f552f7b +- name: github.com/spf13/pflag + version: 4c012f6dcd9546820e378d0bdda4d8fc772cdfea +- name: github.com/spf13/viper + version: 25b30aa063fc18e48662b86996252eabdcf2f0c7 +- name: golang.org/x/crypto + version: 81e90905daefcd6fd217b62423c0908922eadb30 + subpackages: + - ssh/terminal +- name: golang.org/x/net + version: 1c05540f6879653db88113bc4a2b70aec4bd491f + subpackages: + - context + - context/ctxhttp + - http2 + - http2/hpack + - idna + - lex/httplex +- name: golang.org/x/sys + version: 95c6576299259db960f6c5b9b69ea52422860fce + subpackages: + - unix + - windows +- name: golang.org/x/text + version: b19bf474d317b857955b12035d2c5acb57ce8b01 + subpackages: + - cases + - internal + - internal/tag + - language + - runes + - secure/bidirule + - secure/precis + - transform + - unicode/bidi + - unicode/norm + - width +- name: gopkg.in/inf.v0 + version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 +- name: gopkg.in/yaml.v2 + version: 287cf08546ab5e7e37d55a84f7ed3fd1db036de5 +- name: k8s.io/api + version: 11147472b7c934c474a2c484af3c0c5210b7a3af + subpackages: + - admissionregistration/v1alpha1 + - admissionregistration/v1beta1 + - apps/v1 + - apps/v1beta1 + - apps/v1beta2 + - authentication/v1 + - authentication/v1beta1 + - authorization/v1 + - authorization/v1beta1 + - autoscaling/v1 + - autoscaling/v2beta1 + - batch/v1 + - batch/v1beta1 + - batch/v2alpha1 + - certificates/v1beta1 + - core/v1 + - events/v1beta1 + - extensions/v1beta1 + - imagepolicy/v1alpha1 + - networking/v1 + - policy/v1beta1 + - rbac/v1 + - rbac/v1alpha1 + - rbac/v1beta1 + - scheduling/v1alpha1 + - settings/v1alpha1 + - storage/v1 + - storage/v1alpha1 + - storage/v1beta1 +- name: k8s.io/apimachinery + version: 180eddb345a5be3a157cea1c624700ad5bd27b8f + subpackages: + - pkg/api/equality + - pkg/api/errors + - pkg/api/meta + - pkg/api/resource + - pkg/api/testing + - pkg/api/testing/fuzzer + - pkg/api/testing/roundtrip + - pkg/apimachinery + - pkg/apimachinery/announced + - pkg/apimachinery/registered + - pkg/apis/meta/fuzzer + - pkg/apis/meta/internalversion + - pkg/apis/meta/v1 + - pkg/apis/meta/v1/unstructured + - pkg/apis/meta/v1alpha1 + - pkg/conversion + - pkg/conversion/queryparams + - pkg/fields + - pkg/labels + - pkg/runtime + - pkg/runtime/schema + - pkg/runtime/serializer + - pkg/runtime/serializer/json + - pkg/runtime/serializer/protobuf + - pkg/runtime/serializer/recognizer + - pkg/runtime/serializer/streaming + - pkg/runtime/serializer/versioning + - pkg/selection + - pkg/types + - pkg/util/cache + - pkg/util/clock + - pkg/util/diff + - pkg/util/errors + - pkg/util/framer + - pkg/util/httpstream + - pkg/util/httpstream/spdy + - pkg/util/intstr + - pkg/util/json + - pkg/util/mergepatch + - pkg/util/net + - pkg/util/remotecommand + - pkg/util/runtime + - pkg/util/sets + - pkg/util/strategicpatch + - pkg/util/validation + - pkg/util/validation/field + - pkg/util/wait + - pkg/util/yaml + - pkg/version + - pkg/watch + - third_party/forked/golang/json + - third_party/forked/golang/netutil + - third_party/forked/golang/reflect +- name: k8s.io/client-go + version: 78700dec6369ba22221b72770783300f143df150 + subpackages: + - discovery + - kubernetes + - kubernetes/scheme + - kubernetes/typed/admissionregistration/v1alpha1 + - kubernetes/typed/admissionregistration/v1beta1 + - kubernetes/typed/apps/v1 + - kubernetes/typed/apps/v1beta1 + - kubernetes/typed/apps/v1beta2 + - kubernetes/typed/authentication/v1 + - kubernetes/typed/authentication/v1beta1 + - kubernetes/typed/authorization/v1 + - kubernetes/typed/authorization/v1beta1 + - kubernetes/typed/autoscaling/v1 + - kubernetes/typed/autoscaling/v2beta1 + - kubernetes/typed/batch/v1 + - kubernetes/typed/batch/v1beta1 + - kubernetes/typed/batch/v2alpha1 + - kubernetes/typed/certificates/v1beta1 + - kubernetes/typed/core/v1 + - kubernetes/typed/events/v1beta1 + - kubernetes/typed/extensions/v1beta1 + - kubernetes/typed/networking/v1 + - kubernetes/typed/policy/v1beta1 + - kubernetes/typed/rbac/v1 + - kubernetes/typed/rbac/v1alpha1 + - kubernetes/typed/rbac/v1beta1 + - kubernetes/typed/scheduling/v1alpha1 + - kubernetes/typed/settings/v1alpha1 + - kubernetes/typed/storage/v1 + - kubernetes/typed/storage/v1alpha1 + - kubernetes/typed/storage/v1beta1 + - pkg/version + - rest + - rest/watch + - tools/auth + - tools/clientcmd + - tools/clientcmd/api + - tools/clientcmd/api/latest + - tools/clientcmd/api/v1 + - tools/metrics + - tools/reference + - transport + - util/cert + - util/flowcontrol + - util/homedir + - util/integer +- name: k8s.io/kube-openapi + version: 39a7bf85c140f972372c2a0d1ee40adbf0c8bfe1 + subpackages: + - pkg/common + - pkg/util/proto +testImports: [] diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..7cf3178 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,21 @@ +package: github.com/deliveroo/paddle +import: +- package: github.com/aws/aws-sdk-go + version: ^1.12.52 + subpackages: + - aws + - aws/session + - service/s3 + - service/s3/s3manager +- package: github.com/mitchellh/go-homedir +- package: github.com/spf13/afero + version: ^1.0.0 +- package: github.com/spf13/cobra + version: ^0.0.1 +- package: github.com/spf13/viper + version: ^1.0.0 +- package: k8s.io/client-go + version: v6.0.0 +- package: github.com/gregjones/httpcache # k8s dependency +- package: github.com/imdario/mergo # k8s dependency +- package: gopkg.in/yaml.v2 From 081a0f46f5e22d24573cde795d1ae0a8931a1a35 Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Fri, 22 Dec 2017 19:00:51 +0000 Subject: [PATCH 02/17] Attempt decoding pod --- cli/pipeline/run.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go index 7c53869..8bc2f59 100644 --- a/cli/pipeline/run.go +++ b/cli/pipeline/run.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" ) var stepName string @@ -63,6 +64,12 @@ func runPipeline(path string) { } fmt.Printf("{}", list) for _, step := range pipeline.Steps { - compilePodTemplate(pipeline, &step) + stepPod := compilePodTemplate(pipeline, &step) + decode := scheme.Codecs.UniversalDeserializer().Decode + obj, groupVersionKind, err := decode([]byte(stepPod), nil, nil) + + if err != nil { + log.Fatal(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err)) + } } } From 4415e63fb8dbbbb2b4abedfae660254c96d79aef Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Sat, 23 Dec 2017 01:13:54 +0000 Subject: [PATCH 03/17] Run and watch pipeline --- cli/pipeline/run.go | 164 ++++++++++++++++++++++++++++++++++----- cli/pipeline/template.go | 23 +++--- cli/pipeline/watch.go | 48 ++++++++++++ glide.lock | 34 ++++---- glide.yaml | 8 +- 5 files changed, 227 insertions(+), 50 deletions(-) create mode 100644 cli/pipeline/watch.go diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go index 8bc2f59..c36bd2c 100644 --- a/cli/pipeline/run.go +++ b/cli/pipeline/run.go @@ -17,15 +17,29 @@ import ( "fmt" "github.com/spf13/cobra" "io/ioutil" - "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" + "log" + "time" ) -var stepName string +type runCmdFlagsStruct struct { + StepName string +} + +const defaultPollInterval = 5 * time.Second +const defaultTimeout = 120 * time.Second + +var runCmdFlags *runCmdFlagsStruct +var clientset *kubernetes.Clientset var runCmd = &cobra.Command{ - Use: "run [pipeline_yaml] [-s step_name]", + Use: "run [pipeline_yaml]", Short: "Run a pipeline or a pipeline step", Args: cobra.ExactArgs(1), Long: `Store data into S3 under a versioned path, and update HEAD. @@ -35,41 +49,153 @@ Example: $ paddle pipeline run test_pipeline.yaml `, Run: func(cmd *cobra.Command, args []string) { - runPipeline(args[0]) + runPipeline(args[0], runCmdFlags) }, } func init() { - runCmd.Flags().StringVarP(&stepName, "step", "s", "", "Single step to execute") -} + runCmdFlags = &runCmdFlagsStruct{} + runCmd.Flags().StringVarP(&runCmdFlags.StepName, "step", "s", "", "Single step to execute") -func runPipeline(path string) { config, err := getKubernetesConfig() if err != nil { panic(err.Error()) } - clientset, err := kubernetes.NewForConfig(config) + clientset, err = kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } +} + +func runPipeline(path string, flags *runCmdFlagsStruct) { data, err := ioutil.ReadFile(path) if err != nil { panic(err.Error()) } pipeline := parsePipeline(data) - namespace := pipeline.Namespace - list, err := clientset.CoreV1().Pods(namespace).List(v1.ListOptions{}) + // namespace := pipeline.Namespace + // list, err := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{}) + // if err != nil { + // panic(err.Error()) + // } + // fmt.Printf("{}\n", list) + for _, step := range pipeline.Steps { + if flags.StepName != "" && step.Step != flags.StepName { + continue + } + err = runPipelineStep(pipeline, &step, flags) + if err != nil { + log.Fatalf("pipeline step failed: %s", err.Error()) + } + } + // for _, step := range pipeline.Steps { + // stepPod := compilePodTemplate(pipeline, &step) + // decode := scheme.Codecs.UniversalDeserializer().Decode + // obj, groupVersionKind, err := decode([]byte(stepPod), nil, nil) + + // if err != nil { + // log.Fatal(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err)) + // } + // } +} + +func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, flags *runCmdFlagsStruct) error { + log.Printf("[paddle] Running step %s", step.Step) + podDefinition := NewPodDefinition(pipeline, step) + stepPodBuffer := podDefinition.compile() + pod := &v1.Pod{} + yaml.NewYAMLOrJSONDecoder(stepPodBuffer, 4096).Decode(pod) + pods := clientset.CoreV1().Pods(pipeline.Namespace) + + err := deleteAndWait(clientset, podDefinition) if err != nil { - panic(err.Error()) + return err } - fmt.Printf("{}", list) - for _, step := range pipeline.Steps { - stepPod := compilePodTemplate(pipeline, &step) - decode := scheme.Codecs.UniversalDeserializer().Decode - obj, groupVersionKind, err := decode([]byte(stepPod), nil, nil) - if err != nil { - log.Fatal(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err)) + pod, err = pods.Create(pod) + if err != nil { + return err + } + + stopWatching := make(chan bool) + defer close(stopWatching) + + watcher, err := NewPodWatcher(clientset, pod, stopWatching) + if err != nil { + return err + } + + for { + event, ok := <-watcher + if !ok { + stopWatching <- true + return fmt.Errorf("pod %s channel has been closed ", pod.Name) + } + switch event.Object.(type) { + case *v1.Pod: + eventPod := event.Object.(*v1.Pod) + switch event.Type { + case watch.Added, watch.Modified: + if eventPod.Status.Phase == v1.PodSucceeded { + watcher = nil + break + log.Printf("Post succeed") + + } + if eventPod.Status.Phase == v1.PodFailed { + stopWatching <- true + return fmt.Errorf("pod failed: '%s'", eventPod.Status.Message) + } + for i := 0; i < len(eventPod.Status.ContainerStatuses); i++ { + containerStatus := eventPod.Status.ContainerStatuses[i] + term := containerStatus.State.Terminated + if term != nil && term.ExitCode != 0 { + return fmt.Errorf("pod container %s exited with error %s", containerStatus.Name, term.Message) + } + } + case watch.Deleted: + stopWatching <- true + return fmt.Errorf("pod deleted") + case watch.Error: + stopWatching <- true + return fmt.Errorf("pod error") + } + } + if watcher == nil { + break } } + + stopWatching <- true + + err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{}) + if err != nil { + return err + } + log.Printf("[paddle] Finishing pod execution") + return nil +} + +func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error { + pods := clientset.CoreV1().Pods(podDefinition.Namespace) + deleting := false + err := wait.PollImmediate(defaultPollInterval, defaultTimeout, func() (bool, error) { + var err error + err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return true, nil + } else { + return true, err + } + } + if deleting { + log.Print("[paddle] .") + } else { + log.Printf("[paddle] deleting pod %s", podDefinition.PodName) + deleting = true + } + return false, nil + }) + return err } diff --git a/cli/pipeline/template.go b/cli/pipeline/template.go index b669375..d8d9e31 100644 --- a/cli/pipeline/template.go +++ b/cli/pipeline/template.go @@ -1,13 +1,13 @@ package pipeline import ( + "bytes" "fmt" - "os" "strings" "text/template" ) -type PodTemplateData struct { +type PodDefinition struct { PodName string StepName string BranchName string @@ -123,14 +123,11 @@ spec: key: aws-secret-access-key ` -func compilePodTemplate(pipelineDefinition *PipelineDefinition, pipelineDefinitionStep *PipelineDefinitionStep) string { - fmap := template.FuncMap{ - "sanitizeName": sanitizeName, - } +func NewPodDefinition(pipelineDefinition *PipelineDefinition, pipelineDefinitionStep *PipelineDefinitionStep) *PodDefinition { stepName := sanitizeName(pipelineDefinitionStep.Step) branchName := sanitizeName(pipelineDefinitionStep.Branch) podName := fmt.Sprintf("%s-%s-%s", sanitizeName(pipelineDefinition.Pipeline), stepName, branchName) - templateData := PodTemplateData{ + return &PodDefinition{ PodName: podName, Namespace: pipelineDefinition.Namespace, Step: *pipelineDefinitionStep, @@ -138,12 +135,20 @@ func compilePodTemplate(pipelineDefinition *PipelineDefinition, pipelineDefiniti StepName: stepName, BranchName: branchName, } + +} + +func (p PodDefinition) compile() *bytes.Buffer { + fmap := template.FuncMap{ + "sanitizeName": sanitizeName, + } tmpl := template.Must(template.New("podTemplate").Funcs(fmap).Parse(podTemplate)) - err := tmpl.Execute(os.Stdout, templateData) + buffer := new(bytes.Buffer) + err := tmpl.Execute(buffer, p) if err != nil { panic(err.Error()) } - return "" + return buffer } func sanitizeName(name string) string { diff --git a/cli/pipeline/watch.go b/cli/pipeline/watch.go new file mode 100644 index 0000000..7873c77 --- /dev/null +++ b/cli/pipeline/watch.go @@ -0,0 +1,48 @@ +package pipeline + +import ( + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" +) + +func NewPodWatcher(c *kubernetes.Clientset, pod *v1.Pod, stopChannel chan bool) (<-chan watch.Event, error) { + podSelector, err := fields.ParseSelector("metadata.name=" + pod.Name) + if err != nil { + return nil, err + } + options := metav1.ListOptions{ + FieldSelector: podSelector.String(), + Watch: true, + } + + podWatch, err := c.CoreV1().Pods(pod.Namespace).Watch(options) + + eventCh := make(chan watch.Event, 30) + + go func() { + defer podWatch.Stop() + defer close(eventCh) + var podWatchChannelClosed bool + for { + select { + case _ = <-stopChannel: + return + + case podEvent, ok := <-podWatch.ResultChan(): + if !ok { + podWatchChannelClosed = true + } else { + eventCh <- podEvent + } + } + if podWatchChannelClosed { + break + } + } + }() + + return eventCh, nil +} diff --git a/glide.lock b/glide.lock index 845dc0c..5bf8ae6 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,8 @@ -hash: 4a4c528b40066dab94dbbb908b88a4bb99fc963edf6a538ed47d49d778f71d3c -updated: 2017-12-22T16:44:13.9306187Z +hash: 8be82b4770683548c48ea100f6395f68896018f3bc2119d69e22d459aeea91ea +updated: 2017-12-22T22:34:41.613793Z imports: - name: github.com/aws/aws-sdk-go - version: 32d0e45c3f93cd20c25614183246d7e34bc7385c + version: 82ad808f2307df0776c038bfd7ea85440a35c02e subpackages: - aws - aws/awserr @@ -31,6 +31,10 @@ imports: - service/s3/s3iface - service/s3/s3manager - service/sts +- name: github.com/davecgh/go-spew + version: 346938d642f2ec3594ed81d874461961cd0faa76 + subpackages: + - spew - name: github.com/emicklei/go-restful version: ff4f55a206334ef123e4f79bbf348980da81ca46 subpackages: @@ -78,6 +82,10 @@ imports: version: 787624de3eb7bd915c329cba748687a3b22666a6 subpackages: - diskcache +- name: github.com/hashicorp/golang-lru + version: a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4 + subpackages: + - simplelru - name: github.com/hashicorp/hcl version: 23c074d0eceb2b8a5bfdbb271ab780cde70f05a8 subpackages: @@ -143,7 +151,6 @@ imports: version: 1c05540f6879653db88113bc4a2b70aec4bd491f subpackages: - context - - context/ctxhttp - http2 - http2/hpack - idna @@ -206,17 +213,9 @@ imports: - name: k8s.io/apimachinery version: 180eddb345a5be3a157cea1c624700ad5bd27b8f subpackages: - - pkg/api/equality - pkg/api/errors - pkg/api/meta - pkg/api/resource - - pkg/api/testing - - pkg/api/testing/fuzzer - - pkg/api/testing/roundtrip - - pkg/apimachinery - - pkg/apimachinery/announced - - pkg/apimachinery/registered - - pkg/apis/meta/fuzzer - pkg/apis/meta/internalversion - pkg/apis/meta/v1 - pkg/apis/meta/v1/unstructured @@ -240,24 +239,17 @@ imports: - pkg/util/diff - pkg/util/errors - pkg/util/framer - - pkg/util/httpstream - - pkg/util/httpstream/spdy - pkg/util/intstr - pkg/util/json - - pkg/util/mergepatch - pkg/util/net - - pkg/util/remotecommand - pkg/util/runtime - pkg/util/sets - - pkg/util/strategicpatch - pkg/util/validation - pkg/util/validation/field - pkg/util/wait - pkg/util/yaml - pkg/version - pkg/watch - - third_party/forked/golang/json - - third_party/forked/golang/netutil - third_party/forked/golang/reflect - name: k8s.io/client-go version: 78700dec6369ba22221b72770783300f143df150 @@ -297,13 +289,16 @@ imports: - rest - rest/watch - tools/auth + - tools/cache - tools/clientcmd - tools/clientcmd/api - tools/clientcmd/api/latest - tools/clientcmd/api/v1 - tools/metrics + - tools/pager - tools/reference - transport + - util/buffer - util/cert - util/flowcontrol - util/homedir @@ -312,5 +307,4 @@ imports: version: 39a7bf85c140f972372c2a0d1ee40adbf0c8bfe1 subpackages: - pkg/common - - pkg/util/proto testImports: [] diff --git a/glide.yaml b/glide.yaml index 7cf3178..2ec370e 100644 --- a/glide.yaml +++ b/glide.yaml @@ -16,6 +16,10 @@ import: version: ^1.0.0 - package: k8s.io/client-go version: v6.0.0 -- package: github.com/gregjones/httpcache # k8s dependency -- package: github.com/imdario/mergo # k8s dependency +- package: github.com/gregjones/httpcache +- package: github.com/imdario/mergo - package: gopkg.in/yaml.v2 +- package: github.com/davecgh/go-spew + version: ^1.1.0 + subpackages: + - spew From 8a489771fd31a75e5caf164ad18d8317471e0727 Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Mon, 1 Jan 2018 13:27:56 +0100 Subject: [PATCH 04/17] Initial watch logs implementation --- cli/pipeline/run.go | 86 +++++++++++++++++++++++++++++++--------- cli/pipeline/template.go | 2 +- 2 files changed, 69 insertions(+), 19 deletions(-) diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go index c36bd2c..27b10f4 100644 --- a/cli/pipeline/run.go +++ b/cli/pipeline/run.go @@ -14,8 +14,10 @@ package pipeline import ( + "bufio" "fmt" "github.com/spf13/cobra" + "io" "io/ioutil" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -29,7 +31,8 @@ import ( ) type runCmdFlagsStruct struct { - StepName string + StepName string + BucketName string } const defaultPollInterval = 5 * time.Second @@ -56,6 +59,7 @@ $ paddle pipeline run test_pipeline.yaml func init() { runCmdFlags = &runCmdFlagsStruct{} runCmd.Flags().StringVarP(&runCmdFlags.StepName, "step", "s", "", "Single step to execute") + runCmd.Flags().StringVarP(&runCmdFlags.BucketName, "bucket", "b", "", "Bucket name") config, err := getKubernetesConfig() if err != nil { @@ -73,30 +77,20 @@ func runPipeline(path string, flags *runCmdFlagsStruct) { panic(err.Error()) } pipeline := parsePipeline(data) - // namespace := pipeline.Namespace - // list, err := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{}) - // if err != nil { - // panic(err.Error()) - // } - // fmt.Printf("{}\n", list) + if flags.BucketName != "" { + pipeline.Bucket = flags.BucketName + } + for _, step := range pipeline.Steps { if flags.StepName != "" && step.Step != flags.StepName { continue } err = runPipelineStep(pipeline, &step, flags) if err != nil { + time.Sleep(10 * time.Second) log.Fatalf("pipeline step failed: %s", err.Error()) } } - // for _, step := range pipeline.Steps { - // stepPod := compilePodTemplate(pipeline, &step) - // decode := scheme.Codecs.UniversalDeserializer().Decode - // obj, groupVersionKind, err := decode([]byte(stepPod), nil, nil) - - // if err != nil { - // log.Fatal(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err)) - // } - // } } func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, flags *runCmdFlagsStruct) error { @@ -125,6 +119,11 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, return err } + _, err = watchLogs(clientset, pod) + if err != nil { + return fmt.Errorf("Parsing logs failed: %s", err.Error()) + } + for { event, ok := <-watcher if !ok { @@ -139,8 +138,6 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, if eventPod.Status.Phase == v1.PodSucceeded { watcher = nil break - log.Printf("Post succeed") - } if eventPod.Status.Phase == v1.PodFailed { stopWatching <- true @@ -176,6 +173,59 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, return nil } +func watchLogs(client *kubernetes.Clientset, pod *v1.Pod) (<-chan string, error) { + logCh := make(chan string, 30) + + for i := 0; i < len(pod.Spec.Containers); i++ { + go func(i int) { + container := pod.Spec.Containers[i] + readCloser, err := client.Core().Pods(pod.Namespace).GetLogs(pod.Name, &v1.PodLogOptions{ + Container: container.Name, + Follow: true, + }).Stream() + + for { + + opened := false + if err != nil { + // if errors.IsNotFound(err) || errors.IsInvalid(err) { + log.Printf("notfound %s-%s, {}", pod.Name, container.Name, err) + time.Sleep(1 * time.Second) + // } else { + // log.Printf("Error reading log: %s", err.Error()) + // return + // } + } else { + log.Printf("opened %s", container.Name) + opened = true + } + if opened { + break + } + } + + log.Printf("Ok here") + + defer readCloser.Close() + reader := bufio.NewReader(readCloser) + + for { + log.Printf("Reading line for %s", container.Name) + line, err := reader.ReadBytes('\n') + if err != nil { + if err != io.EOF { + log.Printf("Error reading log line: %s", err.Error()) + } + return + } + str := string(line) + log.Printf("[paddle] [%s] %s", container.Name, str) + } + }(i) + } + return logCh, nil +} + func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error { pods := clientset.CoreV1().Pods(podDefinition.Namespace) deleting := false diff --git a/cli/pipeline/template.go b/cli/pipeline/template.go index d8d9e31..2849e48 100644 --- a/cli/pipeline/template.go +++ b/cli/pipeline/template.go @@ -24,7 +24,7 @@ metadata: name: "{{ .PodName }}" namespace: {{ .Namespace }} labels: - pipeline: canoe + canoe: pipeline spec: restartPolicy: Never volumes: From 55dc093db88593fb6d89e2331945a63afb3779b8 Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Mon, 1 Jan 2018 19:25:40 +0100 Subject: [PATCH 05/17] Move monitor and log tailing to watch --- cli/pipeline/run.go | 141 +++++++++++------------------------------- cli/pipeline/watch.go | 131 +++++++++++++++++++++++++++++++++------ 2 files changed, 149 insertions(+), 123 deletions(-) diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go index 27b10f4..361112b 100644 --- a/cli/pipeline/run.go +++ b/cli/pipeline/run.go @@ -14,17 +14,16 @@ package pipeline import ( - "bufio" + "context" + "errors" "fmt" "github.com/spf13/cobra" - "io" "io/ioutil" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + k8errors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/yaml" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "log" "time" @@ -87,8 +86,7 @@ func runPipeline(path string, flags *runCmdFlagsStruct) { } err = runPipelineStep(pipeline, &step, flags) if err != nil { - time.Sleep(10 * time.Second) - log.Fatalf("pipeline step failed: %s", err.Error()) + log.Fatalf("[paddle] %s", err.Error()) } } } @@ -111,121 +109,54 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, return err } - stopWatching := make(chan bool) - defer close(stopWatching) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - watcher, err := NewPodWatcher(clientset, pod, stopWatching) + watch, err := Watch(ctx, clientset, pod) if err != nil { return err } - _, err = watchLogs(clientset, pod) - if err != nil { - return fmt.Errorf("Parsing logs failed: %s", err.Error()) - } + containers := make(map[string]bool) for { - event, ok := <-watcher - if !ok { - stopWatching <- true - return fmt.Errorf("pod %s channel has been closed ", pod.Name) - } - switch event.Object.(type) { - case *v1.Pod: - eventPod := event.Object.(*v1.Pod) - switch event.Type { - case watch.Added, watch.Modified: - if eventPod.Status.Phase == v1.PodSucceeded { - watcher = nil - break - } - if eventPod.Status.Phase == v1.PodFailed { - stopWatching <- true - return fmt.Errorf("pod failed: '%s'", eventPod.Status.Message) + e := <-watch + switch e.Type { + case Added: + log.Printf("[paddle] Container %s/%s starting", pod.Name, e.Container) + containers[e.Container] = true + TailLogs(ctx, clientset, e.Pod, e.Container) + case Deleted: + case Removed: + log.Printf("[paddle] Container removed: %s", e.Container) + continue + case Completed: + log.Printf("[paddle] Pod execution completed") + return nil + case Failed: + var msg string + if e.Container != "" { + if e.Message != "" { + msg = fmt.Sprintf("Container %s/%s failed: '%s'", pod.Name, e.Container, e.Message) + } else { + msg = fmt.Sprintf("Container %s/%s failed", pod.Name, e.Container) } - for i := 0; i < len(eventPod.Status.ContainerStatuses); i++ { - containerStatus := eventPod.Status.ContainerStatuses[i] - term := containerStatus.State.Terminated - if term != nil && term.ExitCode != 0 { - return fmt.Errorf("pod container %s exited with error %s", containerStatus.Name, term.Message) - } + _, present := containers[e.Container] + if !present { // container died before being added + TailLogs(ctx, clientset, e.Pod, e.Container) + time.Sleep(3 * time.Second) // give it time to tail logs } - case watch.Deleted: - stopWatching <- true - return fmt.Errorf("pod deleted") - case watch.Error: - stopWatching <- true - return fmt.Errorf("pod error") + } else { + msg = "Pod failed" } - } - if watcher == nil { - break + return errors.New(msg) } } - stopWatching <- true - - err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{}) - if err != nil { - return err - } log.Printf("[paddle] Finishing pod execution") return nil } -func watchLogs(client *kubernetes.Clientset, pod *v1.Pod) (<-chan string, error) { - logCh := make(chan string, 30) - - for i := 0; i < len(pod.Spec.Containers); i++ { - go func(i int) { - container := pod.Spec.Containers[i] - readCloser, err := client.Core().Pods(pod.Namespace).GetLogs(pod.Name, &v1.PodLogOptions{ - Container: container.Name, - Follow: true, - }).Stream() - - for { - - opened := false - if err != nil { - // if errors.IsNotFound(err) || errors.IsInvalid(err) { - log.Printf("notfound %s-%s, {}", pod.Name, container.Name, err) - time.Sleep(1 * time.Second) - // } else { - // log.Printf("Error reading log: %s", err.Error()) - // return - // } - } else { - log.Printf("opened %s", container.Name) - opened = true - } - if opened { - break - } - } - - log.Printf("Ok here") - - defer readCloser.Close() - reader := bufio.NewReader(readCloser) - - for { - log.Printf("Reading line for %s", container.Name) - line, err := reader.ReadBytes('\n') - if err != nil { - if err != io.EOF { - log.Printf("Error reading log line: %s", err.Error()) - } - return - } - str := string(line) - log.Printf("[paddle] [%s] %s", container.Name, str) - } - }(i) - } - return logCh, nil -} - func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error { pods := clientset.CoreV1().Pods(podDefinition.Namespace) deleting := false @@ -233,7 +164,7 @@ func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error var err error err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{}) if err != nil { - if errors.IsNotFound(err) { + if k8errors.IsNotFound(err) { return true, nil } else { return true, err diff --git a/cli/pipeline/watch.go b/cli/pipeline/watch.go index 7873c77..c3edc41 100644 --- a/cli/pipeline/watch.go +++ b/cli/pipeline/watch.go @@ -1,15 +1,35 @@ package pipeline import ( + "bufio" + "context" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "log" ) -func NewPodWatcher(c *kubernetes.Clientset, pod *v1.Pod, stopChannel chan bool) (<-chan watch.Event, error) { - podSelector, err := fields.ParseSelector("metadata.name=" + pod.Name) +type WatchEventType string + +const ( + Added WatchEventType = "ADDED" + Deleted WatchEventType = "DELETED" + Removed WatchEventType = "REMOVED" + Completed WatchEventType = "COMPLETED" + Failed WatchEventType = "FAILED" +) + +type WatchEvent struct { + Type WatchEventType + Pod *v1.Pod + Container string + Message string +} + +func Watch(ctx context.Context, c *kubernetes.Clientset, watchPod *v1.Pod) (<-chan WatchEvent, error) { + podSelector, err := fields.ParseSelector("metadata.name=" + watchPod.Name) if err != nil { return nil, err } @@ -18,31 +38,106 @@ func NewPodWatcher(c *kubernetes.Clientset, pod *v1.Pod, stopChannel chan bool) Watch: true, } - podWatch, err := c.CoreV1().Pods(pod.Namespace).Watch(options) + watcher, err := c.CoreV1().Pods(watchPod.Namespace).Watch(options) + + out := make(chan WatchEvent) - eventCh := make(chan watch.Event, 30) + containers := make(map[string]bool) go func() { - defer podWatch.Stop() - defer close(eventCh) - var podWatchChannelClosed bool for { select { - case _ = <-stopChannel: - return + case e := <-watcher.ResultChan(): + if e.Object == nil { + // Closed because of error + return + } - case podEvent, ok := <-podWatch.ResultChan(): - if !ok { - podWatchChannelClosed = true - } else { - eventCh <- podEvent + pod := e.Object.(*v1.Pod) + + switch e.Type { + case watch.Added, watch.Modified: + if pod.Status.Phase == v1.PodSucceeded { + out <- WatchEvent{Completed, pod, "", ""} + } else if pod.Status.Phase == v1.PodFailed { + out <- WatchEvent{Failed, pod, "", ""} + } else { + for _, container := range pod.Status.ContainerStatuses { + if container.State.Running != nil { + _, present := containers[container.Name] + if !present { + out <- WatchEvent{Added, pod, container.Name, ""} + containers[container.Name] = true + } + } else if container.State.Terminated != nil { + _, present := containers[container.Name] + if present { + out <- WatchEvent{Removed, pod, container.Name, ""} + containers[container.Name] = false + } + if container.State.Terminated.ExitCode != 0 { + out <- WatchEvent{Failed, pod, container.Name, container.State.Terminated.Message} + } + } + } + } + case watch.Deleted: + out <- WatchEvent{Deleted, pod, "", ""} + case watch.Error: + log.Printf("Pod error") } - } - if podWatchChannelClosed { - break + case <-ctx.Done(): + watcher.Stop() + close(out) + return } } }() - return eventCh, nil + return out, nil +} + +func TailLogs(ctx context.Context, c *kubernetes.Clientset, pod *v1.Pod, container string) { + pods := c.Core().Pods(pod.Namespace) + + req := pods.GetLogs(pod.Name, &v1.PodLogOptions{ + Container: container, + Follow: true, + }) + + closed := make(chan struct{}) + + go func() { + + stream, err := req.Stream() + + if err != nil { + log.Fatalf("Error opening log stream for pod %s", pod.Name) + } + + defer stream.Close() + + go func() { + <-closed + stream.Close() + }() + + go func() { + <-ctx.Done() + close(closed) + }() + + reader := bufio.NewReader(stream) + + for { + line, err := reader.ReadBytes('\n') + if err != nil { + return + } + + str := string(line) + + log.Printf("[%s/%s]: %s", pod.Name, container, str) + } + }() } From c7b5920e829be09fc11c0a03fab610340f22696c Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Mon, 1 Jan 2018 19:42:34 +0100 Subject: [PATCH 06/17] Extract default bucket in Ansible format --- cli/pipeline/run.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go index 361112b..8009f54 100644 --- a/cli/pipeline/run.go +++ b/cli/pipeline/run.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" "log" + "regexp" "time" ) @@ -79,6 +80,12 @@ func runPipeline(path string, flags *runCmdFlagsStruct) { if flags.BucketName != "" { pipeline.Bucket = flags.BucketName } + // For compatibility with Ansible executor + r, _ := regexp.Compile("default\\('(.+)'\\)") + matches := r.FindStringSubmatch(pipeline.Bucket) + if matches != nil && matches[1] != "" { + pipeline.Bucket = matches[1] + } for _, step := range pipeline.Steps { if flags.StepName != "" && step.Step != flags.StepName { @@ -165,6 +172,9 @@ func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{}) if err != nil { if k8errors.IsNotFound(err) { + if deleting { + log.Printf("[paddle] deleted pod %s", podDefinition.PodName) + } return true, nil } else { return true, err From e2211c491445e824845d3d656faaa3d6a685348a Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Tue, 2 Jan 2018 18:13:54 +0100 Subject: [PATCH 07/17] Add run tests --- README.md | 6 + cli/pipeline/run.go | 28 ++-- cli/pipeline/run_test.go | 161 +++++++++++++++++++++ cli/pipeline/test/sample_steps_passing.yml | 35 +++++ cli/pipeline/watch.go | 4 +- 5 files changed, 221 insertions(+), 13 deletions(-) create mode 100644 cli/pipeline/run_test.go create mode 100644 cli/pipeline/test/sample_steps_passing.yml diff --git a/README.md b/README.md index 12e1798..59b80f6 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,12 @@ region=eu-west-1 $ go build ``` +## Testing + +``` +$ go test ./... +``` + ## Release In order to release a new version, set up github export GITHUB_TOKEN=[YOUR_TOKEN] and do the following steps: diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go index 8009f54..b2c8e9f 100644 --- a/cli/pipeline/run.go +++ b/cli/pipeline/run.go @@ -33,13 +33,16 @@ import ( type runCmdFlagsStruct struct { StepName string BucketName string + TailLogs bool } -const defaultPollInterval = 5 * time.Second +const defaultPollInterval = 2 * time.Second const defaultTimeout = 120 * time.Second var runCmdFlags *runCmdFlagsStruct -var clientset *kubernetes.Clientset +var clientset kubernetes.Interface + +var logFatalf = log.Fatalf var runCmd = &cobra.Command{ Use: "run [pipeline_yaml]", @@ -60,6 +63,7 @@ func init() { runCmdFlags = &runCmdFlagsStruct{} runCmd.Flags().StringVarP(&runCmdFlags.StepName, "step", "s", "", "Single step to execute") runCmd.Flags().StringVarP(&runCmdFlags.BucketName, "bucket", "b", "", "Bucket name") + runCmd.Flags().BoolVarP(&runCmdFlags.TailLogs, "logs", "l", true, "Tail logs") config, err := getKubernetesConfig() if err != nil { @@ -93,7 +97,7 @@ func runPipeline(path string, flags *runCmdFlagsStruct) { } err = runPipelineStep(pipeline, &step, flags) if err != nil { - log.Fatalf("[paddle] %s", err.Error()) + logFatalf("[paddle] %s", err.Error()) } } } @@ -111,11 +115,6 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, return err } - pod, err = pods.Create(pod) - if err != nil { - return err - } - ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -124,6 +123,11 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, return err } + pod, err = pods.Create(pod) + if err != nil { + return err + } + containers := make(map[string]bool) for { @@ -132,7 +136,9 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, case Added: log.Printf("[paddle] Container %s/%s starting", pod.Name, e.Container) containers[e.Container] = true - TailLogs(ctx, clientset, e.Pod, e.Container) + if flags.TailLogs { + TailLogs(ctx, clientset, e.Pod, e.Container) + } case Deleted: case Removed: log.Printf("[paddle] Container removed: %s", e.Container) @@ -149,7 +155,7 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, msg = fmt.Sprintf("Container %s/%s failed", pod.Name, e.Container) } _, present := containers[e.Container] - if !present { // container died before being added + if !present && flags.TailLogs { // container died before being added TailLogs(ctx, clientset, e.Pod, e.Container) time.Sleep(3 * time.Second) // give it time to tail logs } @@ -164,7 +170,7 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, return nil } -func deleteAndWait(c *kubernetes.Clientset, podDefinition *PodDefinition) error { +func deleteAndWait(c kubernetes.Interface, podDefinition *PodDefinition) error { pods := clientset.CoreV1().Pods(podDefinition.Namespace) deleting := false err := wait.PollImmediate(defaultPollInterval, defaultTimeout, func() (bool, error) { diff --git a/cli/pipeline/run_test.go b/cli/pipeline/run_test.go new file mode 100644 index 0000000..4a6e127 --- /dev/null +++ b/cli/pipeline/run_test.go @@ -0,0 +1,161 @@ +package pipeline + +import ( + "fmt" + "k8s.io/api/core/v1" + k8errors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + ktesting "k8s.io/client-go/testing" + "testing" + "time" +) + +func parseTimeOrDie(ts string) metav1.Time { + t, err := time.Parse(time.RFC3339, ts) + if err != nil { + panic(err) + } + return metav1.Time{Time: t} +} + +func createPodStatus(phase v1.PodPhase, containers map[string]bool) v1.PodStatus { + containerStatuses := make([]v1.ContainerStatus, len(containers)) + for container, running := range containers { + var state v1.ContainerState + if running { + state = v1.ContainerState{ + Running: &v1.ContainerStateRunning{ + StartedAt: parseTimeOrDie("2015-04-22T11:49:32Z"), + }, + } + } else { + state = v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + ExitCode: 1, + }, + } + } + containerStatuses = append(containerStatuses, v1.ContainerStatus{ + Name: container, + State: state, + Ready: true, + RestartCount: 0, + Image: "test.com/test", + ImageID: "docker://b6b9a86dc06aa1361357ca1b105feba961f6a4145adca6c54e142c0be0fe87b0", + ContainerID: "docker://b6b9a86dc06aa1361357ca1b105feba961f6a4145adca6c54e142c0be0fe87b0", + }) + } + + return v1.PodStatus{ + Phase: phase, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + ContainerStatuses: containerStatuses, + } +} + +func TestRunPipelineSuccess(t *testing.T) { + client := fake.NewSimpleClientset() + clientset = client + + fakeWatch := watch.NewFake() + client.PrependWatchReactor("pods", ktesting.DefaultWatchReactor(fakeWatch, nil)) + + deleted := make(map[string]int) + + client.PrependReactor("delete", "pods", func(action ktesting.Action) (bool, runtime.Object, error) { + a := action.(ktesting.DeleteAction) + name := a.GetName() + deleted[name] += 1 + if deleted[name] < 2 { + return true, nil, nil + } else { + fakeWatch.Reset() + return true, nil, k8errors.NewNotFound(v1.Resource("pods"), name) + } + }) + + created := make(map[string]int) + + client.PrependReactor("create", "pods", func(action ktesting.Action) (bool, runtime.Object, error) { + a := action.(ktesting.CreateAction) + object := a.GetObject() + pod := object.(*v1.Pod) + created[pod.Name] += 1 + go func() { + p := pod.DeepCopy() + p.Status = createPodStatus(v1.PodRunning, map[string]bool{pod.Name + "/main": true, pod.Name + "/paddle": true}) + fakeWatch.Add(p) + time.Sleep(1 * time.Second) + p = p.DeepCopy() + p.Status = createPodStatus(v1.PodSucceeded, map[string]bool{pod.Name + "/main": true, pod.Name + "/paddle": true}) + fakeWatch.Modify(p) + }() + return true, object, nil + }) + + runPipeline("test/sample_steps_passing.yml", &runCmdFlagsStruct{TailLogs: false}) + + expectPods := [2]string{"sample-steps-passing-step1-master", "sample-steps-passing-step2-master"} + + for _, p := range expectPods { + if deleted["sample-steps-passing-step1-master"] != 2 { + t.Errorf("excepted delete of "+p+" to be called twice, got %i", deleted[p]) + } + if created[p] != 1 { + t.Errorf("excepted create of "+p+" to be called once, got %i", created[p]) + } + } +} + +func TestRunPipelineFailure(t *testing.T) { + origLogFatalf := logFatalf + + // after this test, replace the original fatal function + defer func() { logFatalf = origLogFatalf }() + + errors := []string{} + logFatalf = func(format string, args ...interface{}) { + if len(args) > 0 { + errors = append(errors, fmt.Sprintf(format, args)) + } else { + errors = append(errors, format) + } + } + + client := fake.NewSimpleClientset() + clientset = client + + fakeWatch := watch.NewFake() + client.PrependWatchReactor("pods", ktesting.DefaultWatchReactor(fakeWatch, nil)) + + client.PrependReactor("delete", "pods", func(action ktesting.Action) (bool, runtime.Object, error) { + fakeWatch.Reset() + return true, nil, k8errors.NewNotFound(v1.Resource("pods"), action.(ktesting.DeleteAction).GetName()) + }) + + client.PrependReactor("create", "pods", func(action ktesting.Action) (bool, runtime.Object, error) { + a := action.(ktesting.CreateAction) + object := a.GetObject() + pod := object.(*v1.Pod) + go func() { + p := pod.DeepCopy() + p.Status = createPodStatus(v1.PodRunning, map[string]bool{pod.Name + "/main": true, pod.Name + "/paddle": false}) + fakeWatch.Add(p) + }() + return true, object, nil + }) + + runPipeline("test/sample_steps_passing.yml", &runCmdFlagsStruct{TailLogs: false}) + + if len(errors) != 2 { + t.Errorf("excepted two errors, actual %v", len(errors)) + } +} diff --git a/cli/pipeline/test/sample_steps_passing.yml b/cli/pipeline/test/sample_steps_passing.yml new file mode 100644 index 0000000..d43cc1c --- /dev/null +++ b/cli/pipeline/test/sample_steps_passing.yml @@ -0,0 +1,35 @@ +pipeline: sample-steps-passing +bucket: "{{ s3_bucket_name | default('canoe-sample-pipeline') }}" +namespace: modeltraining + +steps: + + - + step: step1 + version: version1 + inputs: [] + image: 219541440308.dkr.ecr.eu-west-1.amazonaws.com/paddlecontainer:latest + branch: master + commands: + - echo executing sample-pipeline-data > ${OUTPUT_PATH}/sample-pipeline-data.txt + resources: + cpu: 1 + memory: 1Gi + + - + step: step2 + version: version1a + inputs: + - + step: step1 + version: version1 + branch: master + path: HEAD + image: 219541440308.dkr.ecr.eu-west-1.amazonaws.com/paddlecontainer:latest + branch: master + commands: + - echo executing sample-pipeline-data > ${OUTPUT_PATH}/sample-pipeline-data-model.txt + resources: + cpu: 2 + memory: 2Gi + diff --git a/cli/pipeline/watch.go b/cli/pipeline/watch.go index c3edc41..dec6b86 100644 --- a/cli/pipeline/watch.go +++ b/cli/pipeline/watch.go @@ -28,7 +28,7 @@ type WatchEvent struct { Message string } -func Watch(ctx context.Context, c *kubernetes.Clientset, watchPod *v1.Pod) (<-chan WatchEvent, error) { +func Watch(ctx context.Context, c kubernetes.Interface, watchPod *v1.Pod) (<-chan WatchEvent, error) { podSelector, err := fields.ParseSelector("metadata.name=" + watchPod.Name) if err != nil { return nil, err @@ -97,7 +97,7 @@ func Watch(ctx context.Context, c *kubernetes.Clientset, watchPod *v1.Pod) (<-ch return out, nil } -func TailLogs(ctx context.Context, c *kubernetes.Clientset, pod *v1.Pod, container string) { +func TailLogs(ctx context.Context, c kubernetes.Interface, pod *v1.Pod, container string) { pods := c.Core().Pods(pod.Namespace) req := pods.GetLogs(pod.Name, &v1.PodLogOptions{ From 09a49addfa63680e9a6c8bdd433207a73dd94748 Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Tue, 2 Jan 2018 19:06:46 +0100 Subject: [PATCH 08/17] Add more tests --- cli/pipeline/pipeline_definition.go | 9 ++++ cli/pipeline/pipeline_definition_test.go | 22 +++++++++ cli/pipeline/run.go | 25 ++++------ cli/pipeline/run_test.go | 8 +-- cli/pipeline/template_test.go | 30 +++++++++++ cli/pipeline/watch_test.go | 63 ++++++++++++++++++++++++ 6 files changed, 139 insertions(+), 18 deletions(-) create mode 100644 cli/pipeline/pipeline_definition_test.go create mode 100644 cli/pipeline/template_test.go create mode 100644 cli/pipeline/watch_test.go diff --git a/cli/pipeline/pipeline_definition.go b/cli/pipeline/pipeline_definition.go index b5a0b18..cf18b9e 100644 --- a/cli/pipeline/pipeline_definition.go +++ b/cli/pipeline/pipeline_definition.go @@ -3,6 +3,7 @@ package pipeline import ( "gopkg.in/yaml.v2" "log" + "regexp" ) type PipelineDefinitionStep struct { @@ -37,5 +38,13 @@ func parsePipeline(data []byte) *PipelineDefinition { if err != nil { log.Fatalf("error: %v", err) } + + // For compatibility with Ansible executor + r, _ := regexp.Compile("default\\('(.+)'\\)") + matches := r.FindStringSubmatch(pipeline.Bucket) + if matches != nil && matches[1] != "" { + pipeline.Bucket = matches[1] + } + return &pipeline } diff --git a/cli/pipeline/pipeline_definition_test.go b/cli/pipeline/pipeline_definition_test.go new file mode 100644 index 0000000..6bf23c0 --- /dev/null +++ b/cli/pipeline/pipeline_definition_test.go @@ -0,0 +1,22 @@ +package pipeline + +import ( + "io/ioutil" + "testing" +) + +func TestParsePipeline(t *testing.T) { + data, err := ioutil.ReadFile("test/sample_steps_passing.yml") + if err != nil { + panic(err.Error()) + } + pipeline := parsePipeline(data) + + if len(pipeline.Steps) != 2 { + t.Errorf("excepted two steps, got %i", len(pipeline.Steps)) + } + + if pipeline.Bucket != "canoe-sample-pipeline" { + t.Errorf("Expected bucket to be canoe-sample-pipeline, got %s", pipeline.Bucket) + } +} diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go index b2c8e9f..2556d72 100644 --- a/cli/pipeline/run.go +++ b/cli/pipeline/run.go @@ -26,18 +26,18 @@ import ( "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" "log" - "regexp" "time" ) type runCmdFlagsStruct struct { - StepName string - BucketName string - TailLogs bool + StepName string + BucketName string + TailLogs bool + DeletePollInterval time.Duration } -const defaultPollInterval = 2 * time.Second -const defaultTimeout = 120 * time.Second +const defaultDeletePollInterval = 2 * time.Second +const deleteTimeout = 120 * time.Second var runCmdFlags *runCmdFlagsStruct var clientset kubernetes.Interface @@ -64,6 +64,7 @@ func init() { runCmd.Flags().StringVarP(&runCmdFlags.StepName, "step", "s", "", "Single step to execute") runCmd.Flags().StringVarP(&runCmdFlags.BucketName, "bucket", "b", "", "Bucket name") runCmd.Flags().BoolVarP(&runCmdFlags.TailLogs, "logs", "l", true, "Tail logs") + runCmdFlags.DeletePollInterval = defaultDeletePollInterval config, err := getKubernetesConfig() if err != nil { @@ -84,12 +85,6 @@ func runPipeline(path string, flags *runCmdFlagsStruct) { if flags.BucketName != "" { pipeline.Bucket = flags.BucketName } - // For compatibility with Ansible executor - r, _ := regexp.Compile("default\\('(.+)'\\)") - matches := r.FindStringSubmatch(pipeline.Bucket) - if matches != nil && matches[1] != "" { - pipeline.Bucket = matches[1] - } for _, step := range pipeline.Steps { if flags.StepName != "" && step.Step != flags.StepName { @@ -110,7 +105,7 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, yaml.NewYAMLOrJSONDecoder(stepPodBuffer, 4096).Decode(pod) pods := clientset.CoreV1().Pods(pipeline.Namespace) - err := deleteAndWait(clientset, podDefinition) + err := deleteAndWait(clientset, podDefinition, flags) if err != nil { return err } @@ -170,10 +165,10 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, return nil } -func deleteAndWait(c kubernetes.Interface, podDefinition *PodDefinition) error { +func deleteAndWait(c kubernetes.Interface, podDefinition *PodDefinition, flags *runCmdFlagsStruct) error { pods := clientset.CoreV1().Pods(podDefinition.Namespace) deleting := false - err := wait.PollImmediate(defaultPollInterval, defaultTimeout, func() (bool, error) { + err := wait.PollImmediate(flags.DeletePollInterval, deleteTimeout, func() (bool, error) { var err error err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{}) if err != nil { diff --git a/cli/pipeline/run_test.go b/cli/pipeline/run_test.go index 4a6e127..2be9a59 100644 --- a/cli/pipeline/run_test.go +++ b/cli/pipeline/run_test.go @@ -21,6 +21,8 @@ func parseTimeOrDie(ts string) metav1.Time { return metav1.Time{Time: t} } +var testRunFlags = &runCmdFlagsStruct{TailLogs: false, DeletePollInterval: 1 * time.Millisecond} + func createPodStatus(phase v1.PodPhase, containers map[string]bool) v1.PodStatus { containerStatuses := make([]v1.ContainerStatus, len(containers)) for container, running := range containers { @@ -93,7 +95,7 @@ func TestRunPipelineSuccess(t *testing.T) { p := pod.DeepCopy() p.Status = createPodStatus(v1.PodRunning, map[string]bool{pod.Name + "/main": true, pod.Name + "/paddle": true}) fakeWatch.Add(p) - time.Sleep(1 * time.Second) + time.Sleep(100 * time.Millisecond) p = p.DeepCopy() p.Status = createPodStatus(v1.PodSucceeded, map[string]bool{pod.Name + "/main": true, pod.Name + "/paddle": true}) fakeWatch.Modify(p) @@ -101,7 +103,7 @@ func TestRunPipelineSuccess(t *testing.T) { return true, object, nil }) - runPipeline("test/sample_steps_passing.yml", &runCmdFlagsStruct{TailLogs: false}) + runPipeline("test/sample_steps_passing.yml", testRunFlags) expectPods := [2]string{"sample-steps-passing-step1-master", "sample-steps-passing-step2-master"} @@ -153,7 +155,7 @@ func TestRunPipelineFailure(t *testing.T) { return true, object, nil }) - runPipeline("test/sample_steps_passing.yml", &runCmdFlagsStruct{TailLogs: false}) + runPipeline("test/sample_steps_passing.yml", testRunFlags) if len(errors) != 2 { t.Errorf("excepted two errors, actual %v", len(errors)) diff --git a/cli/pipeline/template_test.go b/cli/pipeline/template_test.go new file mode 100644 index 0000000..59d6913 --- /dev/null +++ b/cli/pipeline/template_test.go @@ -0,0 +1,30 @@ +package pipeline + +import ( + "io/ioutil" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/yaml" + "testing" +) + +func TestCompileTemplate(t *testing.T) { + data, err := ioutil.ReadFile("test/sample_steps_passing.yml") + if err != nil { + panic(err.Error()) + } + pipeline := parsePipeline(data) + + podDefinition := NewPodDefinition(pipeline, &pipeline.Steps[0]) + stepPodBuffer := podDefinition.compile() + + pod := &v1.Pod{} + yaml.NewYAMLOrJSONDecoder(stepPodBuffer, 4096).Decode(pod) + + if pod.Name != "sample-steps-passing-step1-master" { + t.Errorf("Pod name is %s", pod.Name) + } + + if pod.Spec.Containers[0].Image != pipeline.Steps[0].Image { + t.Errorf("First image is %s", pod.Spec.Containers[0].Image) + } +} diff --git a/cli/pipeline/watch_test.go b/cli/pipeline/watch_test.go new file mode 100644 index 0000000..3da53d8 --- /dev/null +++ b/cli/pipeline/watch_test.go @@ -0,0 +1,63 @@ +package pipeline + +import ( + "context" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + ktesting "k8s.io/client-go/testing" + "testing" + "time" +) + +func TestWatch(t *testing.T) { + client := fake.NewSimpleClientset() + clientset = client + + fakeWatch := watch.NewFake() + client.PrependWatchReactor("pods", ktesting.DefaultWatchReactor(fakeWatch, nil)) + + ctx, _ := context.WithCancel(context.Background()) + watchPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: "testnamespace", + }, + } + ch, _ := Watch(ctx, client, watchPod) + + events := make([]WatchEvent, 0) + + go func() { + for { + e := <-ch + events = append(events, e) + } + }() + + p := watchPod.DeepCopy() + p.Status = createPodStatus(v1.PodRunning, map[string]bool{"foo": true, "bar": true}) + fakeWatch.Add(p) + p = p.DeepCopy() + p.Status = createPodStatus(v1.PodSucceeded, map[string]bool{"foo": true, "bar": true}) + fakeWatch.Modify(p) + p = p.DeepCopy() + p.Status = createPodStatus(v1.PodRunning, map[string]bool{"foo": true, "bar": false}) + fakeWatch.Modify(p) + fakeWatch.Stop() + + time.Sleep(100 * time.Millisecond) + + types := []WatchEventType{Added, Added, Completed, Removed, Failed} + + if len(events) != len(types) { + t.Errorf("Expected %i events, got %i", len(types), len(events)) + } + + for i, et := range types { + if events[i].Type != et { + t.Errorf("Event %i type is %v, expected %v", i, events[i].Type, et) + } + } +} From 483c40543fe73b6b142edf8b516589e3523960e5 Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Tue, 2 Jan 2018 20:09:17 +0100 Subject: [PATCH 09/17] Fix Readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 59b80f6..f082fa0 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Install dependencies: ``` brew install glide -glide up -v +glide i ``` You will need create a `$HOME/.paddle.yaml` that contains the bucket name, e.g: From dd3035e786a56d47d758b50103a36b06a56d3d8f Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Wed, 3 Jan 2018 13:34:58 +0100 Subject: [PATCH 10/17] Add better labels --- cli/pipeline/template.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/cli/pipeline/template.go b/cli/pipeline/template.go index 2849e48..61b1251 100644 --- a/cli/pipeline/template.go +++ b/cli/pipeline/template.go @@ -8,11 +8,12 @@ import ( ) type PodDefinition struct { - PodName string - StepName string - BranchName string - Namespace string - Bucket string + PodName string + StepName string + StepVersion string + BranchName string + Namespace string + Bucket string Step PipelineDefinitionStep } @@ -24,7 +25,10 @@ metadata: name: "{{ .PodName }}" namespace: {{ .Namespace }} labels: - canoe: pipeline + canoe.executor: paddle + canoe.step.name: {{ .StepName }} + canoe.step.branch: {{ .BranchName }} + canoe.step.version: {{ .StepVersion }} spec: restartPolicy: Never volumes: @@ -126,14 +130,16 @@ spec: func NewPodDefinition(pipelineDefinition *PipelineDefinition, pipelineDefinitionStep *PipelineDefinitionStep) *PodDefinition { stepName := sanitizeName(pipelineDefinitionStep.Step) branchName := sanitizeName(pipelineDefinitionStep.Branch) + stepVersion := sanitizeName(pipelineDefinitionStep.Version) podName := fmt.Sprintf("%s-%s-%s", sanitizeName(pipelineDefinition.Pipeline), stepName, branchName) return &PodDefinition{ - PodName: podName, - Namespace: pipelineDefinition.Namespace, - Step: *pipelineDefinitionStep, - Bucket: pipelineDefinition.Bucket, - StepName: stepName, - BranchName: branchName, + PodName: podName, + Namespace: pipelineDefinition.Namespace, + Step: *pipelineDefinitionStep, + Bucket: pipelineDefinition.Bucket, + StepName: stepName, + StepVersion: stepVersion, + BranchName: branchName, } } From fc741ec4b422761f02515a2d3f095d1d7862aff5 Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Wed, 3 Jan 2018 15:28:53 +0100 Subject: [PATCH 11/17] Add version command --- cli/pipeline/kubernetes.go | 14 ++++++++------ cli/root.go | 18 +++++++++--------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/cli/pipeline/kubernetes.go b/cli/pipeline/kubernetes.go index 42c9522..fc4ab40 100644 --- a/cli/pipeline/kubernetes.go +++ b/cli/pipeline/kubernetes.go @@ -1,24 +1,26 @@ package pipeline import ( - "flag" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "os" "path/filepath" ) func getKubernetesConfig() (*rest.Config, error) { var config *rest.Config - var kubeconfig *string + var kubeconfig string if home := homeDir(); home != "" { - kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") + kubeconfig = filepath.Join(home, ".kube", "config") } else { - kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") + kubeconfig = "" } - flag.Parse() - config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) + config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig}, + &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: ""}}).ClientConfig() + if err != nil { config, err = rest.InClusterConfig() } diff --git a/cli/root.go b/cli/root.go index 8be6033..0e659f8 100644 --- a/cli/root.go +++ b/cli/root.go @@ -25,19 +25,22 @@ import ( ) var cfgFile string +var outputVersion bool // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ Use: "paddle", Short: "Canoe tool for data archival and processing", Long: "Canoe tool for data archival and processing", - // Uncomment the following line if your bare application - // has an action associated with it: - // Run: func(cmd *cobra.Command, args []string) { }, + Run: func(cmd *cobra.Command, args []string) { + if outputVersion { + fmt.Println(PaddleVersion) + } else { + cmd.Help() + } + }, } -// Execute adds all child commands to the root command and sets flags appropriately. -// This is called by main.main(). It only needs to happen once to the rootCmd. func Execute() { if err := RootCmd.Execute(); err != nil { fmt.Println(err) @@ -49,10 +52,7 @@ func init() { cobra.OnInitialize(initConfig) RootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.paddle.yaml)") - - // // Cobra also supports local flags, which will only run - // // when this action is called directly. - // RootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") + RootCmd.Flags().BoolVar(&outputVersion, "version", false, "output paddle version") RootCmd.AddCommand(data.DataCmd) RootCmd.AddCommand(pipeline.PipelineCmd) From 67f6a490b47fe22ba520c565657fe2c51e281c9c Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Wed, 3 Jan 2018 15:30:10 +0100 Subject: [PATCH 12/17] Add version generator --- generate_version.sh | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100755 generate_version.sh diff --git a/generate_version.sh b/generate_version.sh new file mode 100755 index 0000000..1946462 --- /dev/null +++ b/generate_version.sh @@ -0,0 +1,7 @@ +version=`cat VERSION` +# Write out the package. +cat << EOF > cli/version.go +package cli +//go:generate bash ./generate_version.sh +var PaddleVersion = "$version" +EOF From c58e5d2d0141d39023ccde5aa418256d9ee07b13 Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Wed, 3 Jan 2018 15:30:43 +0100 Subject: [PATCH 13/17] Version 0.2.0 --- VERSION | 1 + cli/version.go | 3 +++ 2 files changed, 4 insertions(+) create mode 100644 VERSION create mode 100644 cli/version.go diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..0ea3a94 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.2.0 diff --git a/cli/version.go b/cli/version.go new file mode 100644 index 0000000..3b897ef --- /dev/null +++ b/cli/version.go @@ -0,0 +1,3 @@ +package cli +//go:generate bash ./generate_version.sh +var PaddleVersion = "0.2.1" From a49b9ef125f1465d9dffe0c2d7fcd454795abe9a Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Wed, 3 Jan 2018 15:32:53 +0100 Subject: [PATCH 14/17] Add release script --- README.md | 8 ++++---- release.sh | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) create mode 100755 release.sh diff --git a/README.md b/README.md index f082fa0..32108ae 100644 --- a/README.md +++ b/README.md @@ -52,12 +52,12 @@ $ go test ./... ## Release -In order to release a new version, set up github export GITHUB_TOKEN=[YOUR_TOKEN] and do the following steps: +In order to release a new version, set up github export GITHUB_TOKEN=[YOUR_TOKEN] + +Ensure your git repo is clean. Then update VERSION (no need to commit it, it will be committed automatically), and run: ``` -$ git tag -a vX.X.X -m "[Comment]" -$ git push origin vX.X.X -$ goreleaser +$ ./release.sh ``` ## Usage diff --git a/release.sh b/release.sh new file mode 100755 index 0000000..2faa99e --- /dev/null +++ b/release.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash + +set -e +set -x + +die() { echo "$*" 1>&2 ; exit 1; } + +VERSION=`cat VERSION | tr -d '\n'` +git diff-index --quiet --cached HEAD -- || die "Index dirty, commit first" +go generate +git add VERSION +git add cli/version.go +git commit -m "Version $VERSION" +git tag -a v$VERSION -m "Version $VERSION" +git push origin v$VERSION +goreleaser From ea974119770e95e141b067cb85ccfbc05b0d0275 Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Wed, 3 Jan 2018 15:34:24 +0100 Subject: [PATCH 15/17] Update release script --- release.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release.sh b/release.sh index 2faa99e..f2029d6 100755 --- a/release.sh +++ b/release.sh @@ -10,7 +10,7 @@ git diff-index --quiet --cached HEAD -- || die "Index dirty, commit first" go generate git add VERSION git add cli/version.go -git commit -m "Version $VERSION" +git commit -m "Version $VERSION" || echo "Version not changed" git tag -a v$VERSION -m "Version $VERSION" git push origin v$VERSION goreleaser From 75daae4e407c3b3ca7ea9048fc1f0a8af6bfa889 Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Wed, 3 Jan 2018 16:17:35 +0100 Subject: [PATCH 16/17] Fix release script --- generate_version.sh => cli/generate_version.sh | 4 ++-- release.sh | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename generate_version.sh => cli/generate_version.sh (69%) diff --git a/generate_version.sh b/cli/generate_version.sh similarity index 69% rename from generate_version.sh rename to cli/generate_version.sh index 1946462..ac4b80e 100755 --- a/generate_version.sh +++ b/cli/generate_version.sh @@ -1,6 +1,6 @@ -version=`cat VERSION` +version=`cat ../VERSION` # Write out the package. -cat << EOF > cli/version.go +cat << EOF > version.go package cli //go:generate bash ./generate_version.sh var PaddleVersion = "$version" diff --git a/release.sh b/release.sh index f2029d6..ca4b284 100755 --- a/release.sh +++ b/release.sh @@ -7,7 +7,7 @@ die() { echo "$*" 1>&2 ; exit 1; } VERSION=`cat VERSION | tr -d '\n'` git diff-index --quiet --cached HEAD -- || die "Index dirty, commit first" -go generate +go generate ./cli git add VERSION git add cli/version.go git commit -m "Version $VERSION" || echo "Version not changed" From 65c482ea35caf74b2298bc4abd75a502725f047b Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Wed, 3 Jan 2018 16:18:01 +0100 Subject: [PATCH 17/17] Version 0.2.4 --- VERSION | 2 +- cli/version.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index 0ea3a94..abd4105 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.2.0 +0.2.4 diff --git a/cli/version.go b/cli/version.go index 3b897ef..4b4a3c7 100644 --- a/cli/version.go +++ b/cli/version.go @@ -1,3 +1,3 @@ package cli //go:generate bash ./generate_version.sh -var PaddleVersion = "0.2.1" +var PaddleVersion = "0.2.4"