diff --git a/.gitignore b/.gitignore index bfe1862..69fab0b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ tmp dist paddle +vendor diff --git a/README.md b/README.md index 3a74b61..32108ae 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,13 @@ git clone git@github.com:deliveroo/paddle.git cd paddle ``` +Install dependencies: + +``` +brew install glide +glide i +``` + You will need create a `$HOME/.paddle.yaml` that contains the bucket name, e.g: ``` @@ -37,14 +44,20 @@ region=eu-west-1 $ go build ``` +## Testing + +``` +$ go test ./... +``` + ## Release -In order to release a new version, set up github export GITHUB_TOKEN=[YOUR_TOKEN] and do the following steps: +In order to release a new version, set up github export GITHUB_TOKEN=[YOUR_TOKEN] + +Ensure your git repo is clean. Then update VERSION (no need to commit it, it will be committed automatically), and run: ``` -$ git tag -a vX.X.X -m "[Comment]" -$ git push origin vX.X.X -$ goreleaser +$ ./release.sh ``` ## Usage diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..abd4105 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.2.4 diff --git a/cli/generate_version.sh b/cli/generate_version.sh new file mode 100755 index 0000000..ac4b80e --- /dev/null +++ b/cli/generate_version.sh @@ -0,0 +1,7 @@ +version=`cat ../VERSION` +# Write out the package. +cat << EOF > version.go +package cli +//go:generate bash ./generate_version.sh +var PaddleVersion = "$version" +EOF diff --git a/cli/pipeline/cmd.go b/cli/pipeline/cmd.go new file mode 100644 index 0000000..6cee888 --- /dev/null +++ b/cli/pipeline/cmd.go @@ -0,0 +1,28 @@ +// Copyright © 2017 RooFoods LTD +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "github.com/spf13/cobra" +) + +var PipelineCmd = &cobra.Command{ + Use: "pipeline", + Short: "Manage Canoe pipelines", + Long: "Run and control Canoe pipelines", +} + +func init() { + PipelineCmd.AddCommand(runCmd) +} diff --git a/cli/pipeline/kubernetes.go b/cli/pipeline/kubernetes.go new file mode 100644 index 0000000..fc4ab40 --- /dev/null +++ b/cli/pipeline/kubernetes.go @@ -0,0 +1,35 @@ +package pipeline + +import ( + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "os" + "path/filepath" +) + +func getKubernetesConfig() (*rest.Config, error) { + var config *rest.Config + var kubeconfig string + if home := homeDir(); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } else { + kubeconfig = "" + } + + config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig}, + &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: ""}}).ClientConfig() + + if err != nil { + config, err = rest.InClusterConfig() + } + return config, err +} + +func homeDir() string { + if h := os.Getenv("HOME"); h != "" { + return h + } + return os.Getenv("USERPROFILE") // windows +} diff --git a/cli/pipeline/pipeline_definition.go b/cli/pipeline/pipeline_definition.go new file mode 100644 index 0000000..cf18b9e --- /dev/null +++ b/cli/pipeline/pipeline_definition.go @@ -0,0 +1,50 @@ +package pipeline + +import ( + "gopkg.in/yaml.v2" + "log" + "regexp" +) + +type PipelineDefinitionStep struct { + Step string `yaml:"step"` + Version string `yaml:"version"` + Branch string `yaml:"branch"` + Image string `yaml:"image"` + Inputs []struct { + Step string `yaml:"step"` + Version string `yaml:"version"` + Branch string `yaml:"branch"` + Path string `yaml:"path"` + } `yaml:"inputs"` + Commands []string `yaml:"commands"` + Resources struct { + CPU int `yaml:"cpu"` + Memory string `yaml:"memory"` + } `yaml:"resources"` +} + +type PipelineDefinition struct { + Pipeline string `yaml:"pipeline"` + Bucket string `yaml:"bucket"` + Namespace string `yaml:"namespace"` + Steps []PipelineDefinitionStep `yaml:"steps"` +} + +func parsePipeline(data []byte) *PipelineDefinition { + pipeline := PipelineDefinition{} + + err := yaml.Unmarshal(data, &pipeline) + if err != nil { + log.Fatalf("error: %v", err) + } + + // For compatibility with Ansible executor + r, _ := regexp.Compile("default\\('(.+)'\\)") + matches := r.FindStringSubmatch(pipeline.Bucket) + if matches != nil && matches[1] != "" { + pipeline.Bucket = matches[1] + } + + return &pipeline +} diff --git a/cli/pipeline/pipeline_definition_test.go b/cli/pipeline/pipeline_definition_test.go new file mode 100644 index 0000000..6bf23c0 --- /dev/null +++ b/cli/pipeline/pipeline_definition_test.go @@ -0,0 +1,22 @@ +package pipeline + +import ( + "io/ioutil" + "testing" +) + +func TestParsePipeline(t *testing.T) { + data, err := ioutil.ReadFile("test/sample_steps_passing.yml") + if err != nil { + panic(err.Error()) + } + pipeline := parsePipeline(data) + + if len(pipeline.Steps) != 2 { + t.Errorf("excepted two steps, got %i", len(pipeline.Steps)) + } + + if pipeline.Bucket != "canoe-sample-pipeline" { + t.Errorf("Expected bucket to be canoe-sample-pipeline, got %s", pipeline.Bucket) + } +} diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go new file mode 100644 index 0000000..2556d72 --- /dev/null +++ b/cli/pipeline/run.go @@ -0,0 +1,193 @@ +// Copyright © 2017 RooFoods LTD +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "errors" + "fmt" + "github.com/spf13/cobra" + "io/ioutil" + "k8s.io/api/core/v1" + k8errors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/kubernetes" + "log" + "time" +) + +type runCmdFlagsStruct struct { + StepName string + BucketName string + TailLogs bool + DeletePollInterval time.Duration +} + +const defaultDeletePollInterval = 2 * time.Second +const deleteTimeout = 120 * time.Second + +var runCmdFlags *runCmdFlagsStruct +var clientset kubernetes.Interface + +var logFatalf = log.Fatalf + +var runCmd = &cobra.Command{ + Use: "run [pipeline_yaml]", + Short: "Run a pipeline or a pipeline step", + Args: cobra.ExactArgs(1), + Long: `Store data into S3 under a versioned path, and update HEAD. + +Example: + +$ paddle pipeline run test_pipeline.yaml +`, + Run: func(cmd *cobra.Command, args []string) { + runPipeline(args[0], runCmdFlags) + }, +} + +func init() { + runCmdFlags = &runCmdFlagsStruct{} + runCmd.Flags().StringVarP(&runCmdFlags.StepName, "step", "s", "", "Single step to execute") + runCmd.Flags().StringVarP(&runCmdFlags.BucketName, "bucket", "b", "", "Bucket name") + runCmd.Flags().BoolVarP(&runCmdFlags.TailLogs, "logs", "l", true, "Tail logs") + runCmdFlags.DeletePollInterval = defaultDeletePollInterval + + config, err := getKubernetesConfig() + if err != nil { + panic(err.Error()) + } + clientset, err = kubernetes.NewForConfig(config) + if err != nil { + panic(err.Error()) + } +} + +func runPipeline(path string, flags *runCmdFlagsStruct) { + data, err := ioutil.ReadFile(path) + if err != nil { + panic(err.Error()) + } + pipeline := parsePipeline(data) + if flags.BucketName != "" { + pipeline.Bucket = flags.BucketName + } + + for _, step := range pipeline.Steps { + if flags.StepName != "" && step.Step != flags.StepName { + continue + } + err = runPipelineStep(pipeline, &step, flags) + if err != nil { + logFatalf("[paddle] %s", err.Error()) + } + } +} + +func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, flags *runCmdFlagsStruct) error { + log.Printf("[paddle] Running step %s", step.Step) + podDefinition := NewPodDefinition(pipeline, step) + stepPodBuffer := podDefinition.compile() + pod := &v1.Pod{} + yaml.NewYAMLOrJSONDecoder(stepPodBuffer, 4096).Decode(pod) + pods := clientset.CoreV1().Pods(pipeline.Namespace) + + err := deleteAndWait(clientset, podDefinition, flags) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + watch, err := Watch(ctx, clientset, pod) + if err != nil { + return err + } + + pod, err = pods.Create(pod) + if err != nil { + return err + } + + containers := make(map[string]bool) + + for { + e := <-watch + switch e.Type { + case Added: + log.Printf("[paddle] Container %s/%s starting", pod.Name, e.Container) + containers[e.Container] = true + if flags.TailLogs { + TailLogs(ctx, clientset, e.Pod, e.Container) + } + case Deleted: + case Removed: + log.Printf("[paddle] Container removed: %s", e.Container) + continue + case Completed: + log.Printf("[paddle] Pod execution completed") + return nil + case Failed: + var msg string + if e.Container != "" { + if e.Message != "" { + msg = fmt.Sprintf("Container %s/%s failed: '%s'", pod.Name, e.Container, e.Message) + } else { + msg = fmt.Sprintf("Container %s/%s failed", pod.Name, e.Container) + } + _, present := containers[e.Container] + if !present && flags.TailLogs { // container died before being added + TailLogs(ctx, clientset, e.Pod, e.Container) + time.Sleep(3 * time.Second) // give it time to tail logs + } + } else { + msg = "Pod failed" + } + return errors.New(msg) + } + } + + log.Printf("[paddle] Finishing pod execution") + return nil +} + +func deleteAndWait(c kubernetes.Interface, podDefinition *PodDefinition, flags *runCmdFlagsStruct) error { + pods := clientset.CoreV1().Pods(podDefinition.Namespace) + deleting := false + err := wait.PollImmediate(flags.DeletePollInterval, deleteTimeout, func() (bool, error) { + var err error + err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{}) + if err != nil { + if k8errors.IsNotFound(err) { + if deleting { + log.Printf("[paddle] deleted pod %s", podDefinition.PodName) + } + return true, nil + } else { + return true, err + } + } + if deleting { + log.Print("[paddle] .") + } else { + log.Printf("[paddle] deleting pod %s", podDefinition.PodName) + deleting = true + } + return false, nil + }) + return err +} diff --git a/cli/pipeline/run_test.go b/cli/pipeline/run_test.go new file mode 100644 index 0000000..2be9a59 --- /dev/null +++ b/cli/pipeline/run_test.go @@ -0,0 +1,163 @@ +package pipeline + +import ( + "fmt" + "k8s.io/api/core/v1" + k8errors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + ktesting "k8s.io/client-go/testing" + "testing" + "time" +) + +func parseTimeOrDie(ts string) metav1.Time { + t, err := time.Parse(time.RFC3339, ts) + if err != nil { + panic(err) + } + return metav1.Time{Time: t} +} + +var testRunFlags = &runCmdFlagsStruct{TailLogs: false, DeletePollInterval: 1 * time.Millisecond} + +func createPodStatus(phase v1.PodPhase, containers map[string]bool) v1.PodStatus { + containerStatuses := make([]v1.ContainerStatus, len(containers)) + for container, running := range containers { + var state v1.ContainerState + if running { + state = v1.ContainerState{ + Running: &v1.ContainerStateRunning{ + StartedAt: parseTimeOrDie("2015-04-22T11:49:32Z"), + }, + } + } else { + state = v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + ExitCode: 1, + }, + } + } + containerStatuses = append(containerStatuses, v1.ContainerStatus{ + Name: container, + State: state, + Ready: true, + RestartCount: 0, + Image: "test.com/test", + ImageID: "docker://b6b9a86dc06aa1361357ca1b105feba961f6a4145adca6c54e142c0be0fe87b0", + ContainerID: "docker://b6b9a86dc06aa1361357ca1b105feba961f6a4145adca6c54e142c0be0fe87b0", + }) + } + + return v1.PodStatus{ + Phase: phase, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + ContainerStatuses: containerStatuses, + } +} + +func TestRunPipelineSuccess(t *testing.T) { + client := fake.NewSimpleClientset() + clientset = client + + fakeWatch := watch.NewFake() + client.PrependWatchReactor("pods", ktesting.DefaultWatchReactor(fakeWatch, nil)) + + deleted := make(map[string]int) + + client.PrependReactor("delete", "pods", func(action ktesting.Action) (bool, runtime.Object, error) { + a := action.(ktesting.DeleteAction) + name := a.GetName() + deleted[name] += 1 + if deleted[name] < 2 { + return true, nil, nil + } else { + fakeWatch.Reset() + return true, nil, k8errors.NewNotFound(v1.Resource("pods"), name) + } + }) + + created := make(map[string]int) + + client.PrependReactor("create", "pods", func(action ktesting.Action) (bool, runtime.Object, error) { + a := action.(ktesting.CreateAction) + object := a.GetObject() + pod := object.(*v1.Pod) + created[pod.Name] += 1 + go func() { + p := pod.DeepCopy() + p.Status = createPodStatus(v1.PodRunning, map[string]bool{pod.Name + "/main": true, pod.Name + "/paddle": true}) + fakeWatch.Add(p) + time.Sleep(100 * time.Millisecond) + p = p.DeepCopy() + p.Status = createPodStatus(v1.PodSucceeded, map[string]bool{pod.Name + "/main": true, pod.Name + "/paddle": true}) + fakeWatch.Modify(p) + }() + return true, object, nil + }) + + runPipeline("test/sample_steps_passing.yml", testRunFlags) + + expectPods := [2]string{"sample-steps-passing-step1-master", "sample-steps-passing-step2-master"} + + for _, p := range expectPods { + if deleted["sample-steps-passing-step1-master"] != 2 { + t.Errorf("excepted delete of "+p+" to be called twice, got %i", deleted[p]) + } + if created[p] != 1 { + t.Errorf("excepted create of "+p+" to be called once, got %i", created[p]) + } + } +} + +func TestRunPipelineFailure(t *testing.T) { + origLogFatalf := logFatalf + + // after this test, replace the original fatal function + defer func() { logFatalf = origLogFatalf }() + + errors := []string{} + logFatalf = func(format string, args ...interface{}) { + if len(args) > 0 { + errors = append(errors, fmt.Sprintf(format, args)) + } else { + errors = append(errors, format) + } + } + + client := fake.NewSimpleClientset() + clientset = client + + fakeWatch := watch.NewFake() + client.PrependWatchReactor("pods", ktesting.DefaultWatchReactor(fakeWatch, nil)) + + client.PrependReactor("delete", "pods", func(action ktesting.Action) (bool, runtime.Object, error) { + fakeWatch.Reset() + return true, nil, k8errors.NewNotFound(v1.Resource("pods"), action.(ktesting.DeleteAction).GetName()) + }) + + client.PrependReactor("create", "pods", func(action ktesting.Action) (bool, runtime.Object, error) { + a := action.(ktesting.CreateAction) + object := a.GetObject() + pod := object.(*v1.Pod) + go func() { + p := pod.DeepCopy() + p.Status = createPodStatus(v1.PodRunning, map[string]bool{pod.Name + "/main": true, pod.Name + "/paddle": false}) + fakeWatch.Add(p) + }() + return true, object, nil + }) + + runPipeline("test/sample_steps_passing.yml", testRunFlags) + + if len(errors) != 2 { + t.Errorf("excepted two errors, actual %v", len(errors)) + } +} diff --git a/cli/pipeline/template.go b/cli/pipeline/template.go new file mode 100644 index 0000000..61b1251 --- /dev/null +++ b/cli/pipeline/template.go @@ -0,0 +1,165 @@ +package pipeline + +import ( + "bytes" + "fmt" + "strings" + "text/template" +) + +type PodDefinition struct { + PodName string + StepName string + StepVersion string + BranchName string + Namespace string + Bucket string + + Step PipelineDefinitionStep +} + +const podTemplate = ` +apiVersion: v1 +kind: Pod +metadata: + name: "{{ .PodName }}" + namespace: {{ .Namespace }} + labels: + canoe.executor: paddle + canoe.step.name: {{ .StepName }} + canoe.step.branch: {{ .BranchName }} + canoe.step.version: {{ .StepVersion }} +spec: + restartPolicy: Never + volumes: + - + name: shared-data + emptyDir: + medium: '' + containers: + - + name: main + image: "{{ .Step.Image }}" + volumeMounts: + - + name: shared-data + mountPath: /data + resources: + limits: + cpu: "{{ .Step.Resources.CPU }}" + memory: "{{ .Step.Resources.Memory }}" + command: + - "/bin/sh" + - "-c" + - "while true; do if [ -e /data/first-step.txt ]; then (( + {{ range $index, $command := .Step.Commands }} + ({{ $command }}) && + {{ end }} + touch /data/main-passed.txt) || (touch /data/main-failed.txt && exit 1)) && touch /data/main.txt; break; fi; done" + env: + - + name: INPUT_PATH + value: /data/input + - + name: OUTPUT_PATH + value: /data/output + - + name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: aws-credentials-training + key: aws-access-key-id + - + name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: aws-credentials-training + key: aws-secret-access-key + - + name: paddle + image: "219541440308.dkr.ecr.eu-west-1.amazonaws.com/paddlecontainer:latest" + volumeMounts: + - + name: shared-data + mountPath: /data + command: + - "/bin/sh" + - "-c" + - "mkdir -p $INPUT_PATH $OUTPUT_PATH && + {{ range $index, $input := .Step.Inputs }} + paddle data get {{ $input.Step }}/{{ $input.Version }} $INPUT_PATH -b {{ $input.Branch | sanitizeName }} -p {{ $input.Path }} && + {{ end }} + touch /data/first-step.txt && + echo first step finished && + (while true; do + if [ -e /data/main-failed.txt ]; then + exit 1; + fi; + if [ -e /data/main-passed.txt ]; then + paddle data commit $OUTPUT_PATH {{ .StepName }}/{{ .Step.Version }} -b {{ .BranchName }}; + exit 0; + fi; + done)" + env: + - + name: BUCKET + value: "{{ .Bucket }}" + - + name: AWS_REGION + value: eu-west-1 + - + name: INPUT_PATH + value: /data/input + - + name: OUTPUT_PATH + value: /data/output + - + name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: aws-credentials + key: aws-access-key-id + - + name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: aws-credentials + key: aws-secret-access-key +` + +func NewPodDefinition(pipelineDefinition *PipelineDefinition, pipelineDefinitionStep *PipelineDefinitionStep) *PodDefinition { + stepName := sanitizeName(pipelineDefinitionStep.Step) + branchName := sanitizeName(pipelineDefinitionStep.Branch) + stepVersion := sanitizeName(pipelineDefinitionStep.Version) + podName := fmt.Sprintf("%s-%s-%s", sanitizeName(pipelineDefinition.Pipeline), stepName, branchName) + return &PodDefinition{ + PodName: podName, + Namespace: pipelineDefinition.Namespace, + Step: *pipelineDefinitionStep, + Bucket: pipelineDefinition.Bucket, + StepName: stepName, + StepVersion: stepVersion, + BranchName: branchName, + } + +} + +func (p PodDefinition) compile() *bytes.Buffer { + fmap := template.FuncMap{ + "sanitizeName": sanitizeName, + } + tmpl := template.Must(template.New("podTemplate").Funcs(fmap).Parse(podTemplate)) + buffer := new(bytes.Buffer) + err := tmpl.Execute(buffer, p) + if err != nil { + panic(err.Error()) + } + return buffer +} + +func sanitizeName(name string) string { + str := strings.ToLower(name) + str = strings.Replace(str, "_", "-", -1) + str = strings.Replace(str, "/", "-", -1) + return str +} diff --git a/cli/pipeline/template_test.go b/cli/pipeline/template_test.go new file mode 100644 index 0000000..59d6913 --- /dev/null +++ b/cli/pipeline/template_test.go @@ -0,0 +1,30 @@ +package pipeline + +import ( + "io/ioutil" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/yaml" + "testing" +) + +func TestCompileTemplate(t *testing.T) { + data, err := ioutil.ReadFile("test/sample_steps_passing.yml") + if err != nil { + panic(err.Error()) + } + pipeline := parsePipeline(data) + + podDefinition := NewPodDefinition(pipeline, &pipeline.Steps[0]) + stepPodBuffer := podDefinition.compile() + + pod := &v1.Pod{} + yaml.NewYAMLOrJSONDecoder(stepPodBuffer, 4096).Decode(pod) + + if pod.Name != "sample-steps-passing-step1-master" { + t.Errorf("Pod name is %s", pod.Name) + } + + if pod.Spec.Containers[0].Image != pipeline.Steps[0].Image { + t.Errorf("First image is %s", pod.Spec.Containers[0].Image) + } +} diff --git a/cli/pipeline/test/sample_steps_passing.yml b/cli/pipeline/test/sample_steps_passing.yml new file mode 100644 index 0000000..d43cc1c --- /dev/null +++ b/cli/pipeline/test/sample_steps_passing.yml @@ -0,0 +1,35 @@ +pipeline: sample-steps-passing +bucket: "{{ s3_bucket_name | default('canoe-sample-pipeline') }}" +namespace: modeltraining + +steps: + + - + step: step1 + version: version1 + inputs: [] + image: 219541440308.dkr.ecr.eu-west-1.amazonaws.com/paddlecontainer:latest + branch: master + commands: + - echo executing sample-pipeline-data > ${OUTPUT_PATH}/sample-pipeline-data.txt + resources: + cpu: 1 + memory: 1Gi + + - + step: step2 + version: version1a + inputs: + - + step: step1 + version: version1 + branch: master + path: HEAD + image: 219541440308.dkr.ecr.eu-west-1.amazonaws.com/paddlecontainer:latest + branch: master + commands: + - echo executing sample-pipeline-data > ${OUTPUT_PATH}/sample-pipeline-data-model.txt + resources: + cpu: 2 + memory: 2Gi + diff --git a/cli/pipeline/watch.go b/cli/pipeline/watch.go new file mode 100644 index 0000000..dec6b86 --- /dev/null +++ b/cli/pipeline/watch.go @@ -0,0 +1,143 @@ +package pipeline + +import ( + "bufio" + "context" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "log" +) + +type WatchEventType string + +const ( + Added WatchEventType = "ADDED" + Deleted WatchEventType = "DELETED" + Removed WatchEventType = "REMOVED" + Completed WatchEventType = "COMPLETED" + Failed WatchEventType = "FAILED" +) + +type WatchEvent struct { + Type WatchEventType + Pod *v1.Pod + Container string + Message string +} + +func Watch(ctx context.Context, c kubernetes.Interface, watchPod *v1.Pod) (<-chan WatchEvent, error) { + podSelector, err := fields.ParseSelector("metadata.name=" + watchPod.Name) + if err != nil { + return nil, err + } + options := metav1.ListOptions{ + FieldSelector: podSelector.String(), + Watch: true, + } + + watcher, err := c.CoreV1().Pods(watchPod.Namespace).Watch(options) + + out := make(chan WatchEvent) + + containers := make(map[string]bool) + + go func() { + for { + select { + case e := <-watcher.ResultChan(): + if e.Object == nil { + // Closed because of error + return + } + + pod := e.Object.(*v1.Pod) + + switch e.Type { + case watch.Added, watch.Modified: + if pod.Status.Phase == v1.PodSucceeded { + out <- WatchEvent{Completed, pod, "", ""} + } else if pod.Status.Phase == v1.PodFailed { + out <- WatchEvent{Failed, pod, "", ""} + } else { + for _, container := range pod.Status.ContainerStatuses { + if container.State.Running != nil { + _, present := containers[container.Name] + if !present { + out <- WatchEvent{Added, pod, container.Name, ""} + containers[container.Name] = true + } + } else if container.State.Terminated != nil { + _, present := containers[container.Name] + if present { + out <- WatchEvent{Removed, pod, container.Name, ""} + containers[container.Name] = false + } + if container.State.Terminated.ExitCode != 0 { + out <- WatchEvent{Failed, pod, container.Name, container.State.Terminated.Message} + } + } + } + } + case watch.Deleted: + out <- WatchEvent{Deleted, pod, "", ""} + case watch.Error: + log.Printf("Pod error") + } + case <-ctx.Done(): + watcher.Stop() + close(out) + return + } + } + }() + + return out, nil +} + +func TailLogs(ctx context.Context, c kubernetes.Interface, pod *v1.Pod, container string) { + pods := c.Core().Pods(pod.Namespace) + + req := pods.GetLogs(pod.Name, &v1.PodLogOptions{ + Container: container, + Follow: true, + }) + + closed := make(chan struct{}) + + go func() { + + stream, err := req.Stream() + + if err != nil { + log.Fatalf("Error opening log stream for pod %s", pod.Name) + } + + defer stream.Close() + + go func() { + <-closed + stream.Close() + }() + + go func() { + <-ctx.Done() + close(closed) + }() + + reader := bufio.NewReader(stream) + + for { + line, err := reader.ReadBytes('\n') + if err != nil { + return + } + + str := string(line) + + log.Printf("[%s/%s]: %s", pod.Name, container, str) + } + }() +} diff --git a/cli/pipeline/watch_test.go b/cli/pipeline/watch_test.go new file mode 100644 index 0000000..3da53d8 --- /dev/null +++ b/cli/pipeline/watch_test.go @@ -0,0 +1,63 @@ +package pipeline + +import ( + "context" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + ktesting "k8s.io/client-go/testing" + "testing" + "time" +) + +func TestWatch(t *testing.T) { + client := fake.NewSimpleClientset() + clientset = client + + fakeWatch := watch.NewFake() + client.PrependWatchReactor("pods", ktesting.DefaultWatchReactor(fakeWatch, nil)) + + ctx, _ := context.WithCancel(context.Background()) + watchPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: "testnamespace", + }, + } + ch, _ := Watch(ctx, client, watchPod) + + events := make([]WatchEvent, 0) + + go func() { + for { + e := <-ch + events = append(events, e) + } + }() + + p := watchPod.DeepCopy() + p.Status = createPodStatus(v1.PodRunning, map[string]bool{"foo": true, "bar": true}) + fakeWatch.Add(p) + p = p.DeepCopy() + p.Status = createPodStatus(v1.PodSucceeded, map[string]bool{"foo": true, "bar": true}) + fakeWatch.Modify(p) + p = p.DeepCopy() + p.Status = createPodStatus(v1.PodRunning, map[string]bool{"foo": true, "bar": false}) + fakeWatch.Modify(p) + fakeWatch.Stop() + + time.Sleep(100 * time.Millisecond) + + types := []WatchEventType{Added, Added, Completed, Removed, Failed} + + if len(events) != len(types) { + t.Errorf("Expected %i events, got %i", len(types), len(events)) + } + + for i, et := range types { + if events[i].Type != et { + t.Errorf("Event %i type is %v, expected %v", i, events[i].Type, et) + } + } +} diff --git a/cli/root.go b/cli/root.go index ae2c4c7..0e659f8 100644 --- a/cli/root.go +++ b/cli/root.go @@ -18,25 +18,29 @@ import ( "os" "github.com/deliveroo/paddle/cli/data" + "github.com/deliveroo/paddle/cli/pipeline" homedir "github.com/mitchellh/go-homedir" "github.com/spf13/cobra" "github.com/spf13/viper" ) var cfgFile string +var outputVersion bool // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ Use: "paddle", Short: "Canoe tool for data archival and processing", Long: "Canoe tool for data archival and processing", - // Uncomment the following line if your bare application - // has an action associated with it: - // Run: func(cmd *cobra.Command, args []string) { }, + Run: func(cmd *cobra.Command, args []string) { + if outputVersion { + fmt.Println(PaddleVersion) + } else { + cmd.Help() + } + }, } -// Execute adds all child commands to the root command and sets flags appropriately. -// This is called by main.main(). It only needs to happen once to the rootCmd. func Execute() { if err := RootCmd.Execute(); err != nil { fmt.Println(err) @@ -48,13 +52,10 @@ func init() { cobra.OnInitialize(initConfig) RootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.paddle.yaml)") - - // // Cobra also supports local flags, which will only run - // // when this action is called directly. - // RootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") + RootCmd.Flags().BoolVar(&outputVersion, "version", false, "output paddle version") RootCmd.AddCommand(data.DataCmd) - + RootCmd.AddCommand(pipeline.PipelineCmd) } // initConfig reads in config file and ENV variables if set. diff --git a/cli/version.go b/cli/version.go new file mode 100644 index 0000000..4b4a3c7 --- /dev/null +++ b/cli/version.go @@ -0,0 +1,3 @@ +package cli +//go:generate bash ./generate_version.sh +var PaddleVersion = "0.2.4" diff --git a/glide.lock b/glide.lock new file mode 100644 index 0000000..5bf8ae6 --- /dev/null +++ b/glide.lock @@ -0,0 +1,310 @@ +hash: 8be82b4770683548c48ea100f6395f68896018f3bc2119d69e22d459aeea91ea +updated: 2017-12-22T22:34:41.613793Z +imports: +- name: github.com/aws/aws-sdk-go + version: 82ad808f2307df0776c038bfd7ea85440a35c02e + subpackages: + - aws + - aws/awserr + - aws/awsutil + - aws/client + - aws/client/metadata + - aws/corehandlers + - aws/credentials + - aws/credentials/ec2rolecreds + - aws/credentials/endpointcreds + - aws/credentials/stscreds + - aws/defaults + - aws/ec2metadata + - aws/endpoints + - aws/request + - aws/session + - aws/signer/v4 + - internal/shareddefaults + - private/protocol + - private/protocol/query + - private/protocol/query/queryutil + - private/protocol/rest + - private/protocol/restxml + - private/protocol/xml/xmlutil + - service/s3 + - service/s3/s3iface + - service/s3/s3manager + - service/sts +- name: github.com/davecgh/go-spew + version: 346938d642f2ec3594ed81d874461961cd0faa76 + subpackages: + - spew +- name: github.com/emicklei/go-restful + version: ff4f55a206334ef123e4f79bbf348980da81ca46 + subpackages: + - log +- name: github.com/fsnotify/fsnotify + version: 4da3e2cfbabc9f751898f250b49f2439785783a1 +- name: github.com/ghodss/yaml + version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee +- name: github.com/go-ini/ini + version: 32e4c1e6bc4e7d0d8451aa6b75200d19e37a536a +- name: github.com/go-openapi/jsonpointer + version: 46af16f9f7b149af66e5d1bd010e3574dc06de98 +- name: github.com/go-openapi/jsonreference + version: 13c6e3589ad90f49bd3e3bbe2c2cb3d7a4142272 +- name: github.com/go-openapi/spec + version: 7abd5745472fff5eb3685386d5fb8bf38683154d +- name: github.com/go-openapi/swag + version: f3f9494671f93fcff853e3c6e9e948b3eb71e590 +- name: github.com/gogo/protobuf + version: c0656edd0d9eab7c66d1eb0c568f9039345796f7 + subpackages: + - proto + - sortkeys +- name: github.com/golang/glog + version: 44145f04b68cf362d9c4df2182967c2275eaefed +- name: github.com/golang/protobuf + version: 1643683e1b54a9e88ad26d98f81400c8c9d9f4f9 + subpackages: + - proto + - ptypes + - ptypes/any + - ptypes/duration + - ptypes/timestamp +- name: github.com/google/btree + version: 7d79101e329e5a3adf994758c578dab82b90c017 +- name: github.com/google/gofuzz + version: 44d81051d367757e1c7c6a5a86423ece9afcf63c +- name: github.com/googleapis/gnostic + version: 0c5108395e2debce0d731cf0287ddf7242066aba + subpackages: + - OpenAPIv2 + - compiler + - extensions +- name: github.com/gregjones/httpcache + version: 787624de3eb7bd915c329cba748687a3b22666a6 + subpackages: + - diskcache +- name: github.com/hashicorp/golang-lru + version: a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4 + subpackages: + - simplelru +- name: github.com/hashicorp/hcl + version: 23c074d0eceb2b8a5bfdbb271ab780cde70f05a8 + subpackages: + - hcl/ast + - hcl/parser + - hcl/scanner + - hcl/strconv + - hcl/token + - json/parser + - json/scanner + - json/token +- name: github.com/howeyc/gopass + version: bf9dde6d0d2c004a008c27aaee91170c786f6db8 +- name: github.com/imdario/mergo + version: 6633656539c1639d9d78127b7d47c622b5d7b6dc +- name: github.com/inconshreveable/mousetrap + version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 +- name: github.com/jmespath/go-jmespath + version: dd801d4f4ce7ac746e7e7b4489d2fa600b3b096b +- name: github.com/json-iterator/go + version: 36b14963da70d11297d313183d7e6388c8510e1e +- name: github.com/juju/ratelimit + version: 5b9ff866471762aa2ab2dced63c9fb6f53921342 +- name: github.com/magiconair/properties + version: 49d762b9817ba1c2e9d0c69183c2b4a8b8f1d934 +- name: github.com/mailru/easyjson + version: 2f5df55504ebc322e4d52d34df6a1f5b503bf26d + subpackages: + - buffer + - jlexer + - jwriter +- name: github.com/mitchellh/go-homedir + version: b8bc1bf767474819792c23f32d8286a45736f1c6 +- name: github.com/mitchellh/mapstructure + version: 06020f85339e21b2478f756a78e295255ffa4d6a +- name: github.com/pelletier/go-toml + version: 0131db6d737cfbbfb678f8b7d92e55e27ce46224 +- name: github.com/peterbourgon/diskv + version: 5f041e8faa004a95c88a202771f4cc3e991971e6 +- name: github.com/PuerkitoBio/purell + version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 +- name: github.com/PuerkitoBio/urlesc + version: 5bd2802263f21d8788851d5305584c82a5c75d7e +- name: github.com/spf13/afero + version: 8d919cbe7e2627e417f3e45c3c0e489a5b7e2536 + subpackages: + - mem +- name: github.com/spf13/cast + version: acbeb36b902d72a7a4c18e8f3241075e7ab763e4 +- name: github.com/spf13/cobra + version: 7b2c5ac9fc04fc5efafb60700713d4fa609b777b +- name: github.com/spf13/jwalterweatherman + version: 12bd96e66386c1960ab0f74ced1362f66f552f7b +- name: github.com/spf13/pflag + version: 4c012f6dcd9546820e378d0bdda4d8fc772cdfea +- name: github.com/spf13/viper + version: 25b30aa063fc18e48662b86996252eabdcf2f0c7 +- name: golang.org/x/crypto + version: 81e90905daefcd6fd217b62423c0908922eadb30 + subpackages: + - ssh/terminal +- name: golang.org/x/net + version: 1c05540f6879653db88113bc4a2b70aec4bd491f + subpackages: + - context + - http2 + - http2/hpack + - idna + - lex/httplex +- name: golang.org/x/sys + version: 95c6576299259db960f6c5b9b69ea52422860fce + subpackages: + - unix + - windows +- name: golang.org/x/text + version: b19bf474d317b857955b12035d2c5acb57ce8b01 + subpackages: + - cases + - internal + - internal/tag + - language + - runes + - secure/bidirule + - secure/precis + - transform + - unicode/bidi + - unicode/norm + - width +- name: gopkg.in/inf.v0 + version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 +- name: gopkg.in/yaml.v2 + version: 287cf08546ab5e7e37d55a84f7ed3fd1db036de5 +- name: k8s.io/api + version: 11147472b7c934c474a2c484af3c0c5210b7a3af + subpackages: + - admissionregistration/v1alpha1 + - admissionregistration/v1beta1 + - apps/v1 + - apps/v1beta1 + - apps/v1beta2 + - authentication/v1 + - authentication/v1beta1 + - authorization/v1 + - authorization/v1beta1 + - autoscaling/v1 + - autoscaling/v2beta1 + - batch/v1 + - batch/v1beta1 + - batch/v2alpha1 + - certificates/v1beta1 + - core/v1 + - events/v1beta1 + - extensions/v1beta1 + - imagepolicy/v1alpha1 + - networking/v1 + - policy/v1beta1 + - rbac/v1 + - rbac/v1alpha1 + - rbac/v1beta1 + - scheduling/v1alpha1 + - settings/v1alpha1 + - storage/v1 + - storage/v1alpha1 + - storage/v1beta1 +- name: k8s.io/apimachinery + version: 180eddb345a5be3a157cea1c624700ad5bd27b8f + subpackages: + - pkg/api/errors + - pkg/api/meta + - pkg/api/resource + - pkg/apis/meta/internalversion + - pkg/apis/meta/v1 + - pkg/apis/meta/v1/unstructured + - pkg/apis/meta/v1alpha1 + - pkg/conversion + - pkg/conversion/queryparams + - pkg/fields + - pkg/labels + - pkg/runtime + - pkg/runtime/schema + - pkg/runtime/serializer + - pkg/runtime/serializer/json + - pkg/runtime/serializer/protobuf + - pkg/runtime/serializer/recognizer + - pkg/runtime/serializer/streaming + - pkg/runtime/serializer/versioning + - pkg/selection + - pkg/types + - pkg/util/cache + - pkg/util/clock + - pkg/util/diff + - pkg/util/errors + - pkg/util/framer + - pkg/util/intstr + - pkg/util/json + - pkg/util/net + - pkg/util/runtime + - pkg/util/sets + - pkg/util/validation + - pkg/util/validation/field + - pkg/util/wait + - pkg/util/yaml + - pkg/version + - pkg/watch + - third_party/forked/golang/reflect +- name: k8s.io/client-go + version: 78700dec6369ba22221b72770783300f143df150 + subpackages: + - discovery + - kubernetes + - kubernetes/scheme + - kubernetes/typed/admissionregistration/v1alpha1 + - kubernetes/typed/admissionregistration/v1beta1 + - kubernetes/typed/apps/v1 + - kubernetes/typed/apps/v1beta1 + - kubernetes/typed/apps/v1beta2 + - kubernetes/typed/authentication/v1 + - kubernetes/typed/authentication/v1beta1 + - kubernetes/typed/authorization/v1 + - kubernetes/typed/authorization/v1beta1 + - kubernetes/typed/autoscaling/v1 + - kubernetes/typed/autoscaling/v2beta1 + - kubernetes/typed/batch/v1 + - kubernetes/typed/batch/v1beta1 + - kubernetes/typed/batch/v2alpha1 + - kubernetes/typed/certificates/v1beta1 + - kubernetes/typed/core/v1 + - kubernetes/typed/events/v1beta1 + - kubernetes/typed/extensions/v1beta1 + - kubernetes/typed/networking/v1 + - kubernetes/typed/policy/v1beta1 + - kubernetes/typed/rbac/v1 + - kubernetes/typed/rbac/v1alpha1 + - kubernetes/typed/rbac/v1beta1 + - kubernetes/typed/scheduling/v1alpha1 + - kubernetes/typed/settings/v1alpha1 + - kubernetes/typed/storage/v1 + - kubernetes/typed/storage/v1alpha1 + - kubernetes/typed/storage/v1beta1 + - pkg/version + - rest + - rest/watch + - tools/auth + - tools/cache + - tools/clientcmd + - tools/clientcmd/api + - tools/clientcmd/api/latest + - tools/clientcmd/api/v1 + - tools/metrics + - tools/pager + - tools/reference + - transport + - util/buffer + - util/cert + - util/flowcontrol + - util/homedir + - util/integer +- name: k8s.io/kube-openapi + version: 39a7bf85c140f972372c2a0d1ee40adbf0c8bfe1 + subpackages: + - pkg/common +testImports: [] diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..2ec370e --- /dev/null +++ b/glide.yaml @@ -0,0 +1,25 @@ +package: github.com/deliveroo/paddle +import: +- package: github.com/aws/aws-sdk-go + version: ^1.12.52 + subpackages: + - aws + - aws/session + - service/s3 + - service/s3/s3manager +- package: github.com/mitchellh/go-homedir +- package: github.com/spf13/afero + version: ^1.0.0 +- package: github.com/spf13/cobra + version: ^0.0.1 +- package: github.com/spf13/viper + version: ^1.0.0 +- package: k8s.io/client-go + version: v6.0.0 +- package: github.com/gregjones/httpcache +- package: github.com/imdario/mergo +- package: gopkg.in/yaml.v2 +- package: github.com/davecgh/go-spew + version: ^1.1.0 + subpackages: + - spew diff --git a/release.sh b/release.sh new file mode 100755 index 0000000..ca4b284 --- /dev/null +++ b/release.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash + +set -e +set -x + +die() { echo "$*" 1>&2 ; exit 1; } + +VERSION=`cat VERSION | tr -d '\n'` +git diff-index --quiet --cached HEAD -- || die "Index dirty, commit first" +go generate ./cli +git add VERSION +git add cli/version.go +git commit -m "Version $VERSION" || echo "Version not changed" +git tag -a v$VERSION -m "Version $VERSION" +git push origin v$VERSION +goreleaser