Skip to content
This repository has been archived by the owner on Dec 9, 2022. It is now read-only.

Add pipeline command #3

Merged
merged 17 commits into from
Jan 3, 2018
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
tmp
dist
paddle
vendor
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ git clone [email protected]:deliveroo/paddle.git
cd paddle
```

Install dependencies:

```
brew install glide
glide i
```

You will need create a `$HOME/.paddle.yaml` that contains the bucket name, e.g:

```
Expand All @@ -37,6 +44,12 @@ region=eu-west-1
$ go build
```

## Testing

```
$ go test ./...
```

## Release

In order to release a new version, set up github export GITHUB_TOKEN=[YOUR_TOKEN] and do the following steps:
Expand Down
28 changes: 28 additions & 0 deletions cli/pipeline/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright © 2017 RooFoods LTD
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline

import (
"github.com/spf13/cobra"
)

var PipelineCmd = &cobra.Command{
Use: "pipeline",
Short: "Manage Canoe pipelines",
Long: "Run and control Canoe pipelines",
}

func init() {
PipelineCmd.AddCommand(runCmd)
}
33 changes: 33 additions & 0 deletions cli/pipeline/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package pipeline

import (
"flag"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"os"
"path/filepath"
)

func getKubernetesConfig() (*rest.Config, error) {
var config *rest.Config
var kubeconfig *string
if home := homeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()

config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
config, err = rest.InClusterConfig()
}
return config, err
}

func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // windows
}
50 changes: 50 additions & 0 deletions cli/pipeline/pipeline_definition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package pipeline

import (
"gopkg.in/yaml.v2"
"log"
"regexp"
)

type PipelineDefinitionStep struct {
Step string `yaml:"step"`
Version string `yaml:"version"`
Branch string `yaml:"branch"`
Image string `yaml:"image"`
Inputs []struct {
Step string `yaml:"step"`
Version string `yaml:"version"`
Branch string `yaml:"branch"`
Path string `yaml:"path"`
} `yaml:"inputs"`
Commands []string `yaml:"commands"`
Resources struct {
CPU int `yaml:"cpu"`
Memory string `yaml:"memory"`
} `yaml:"resources"`
}

type PipelineDefinition struct {
Pipeline string `yaml:"pipeline"`
Bucket string `yaml:"bucket"`
Namespace string `yaml:"namespace"`
Steps []PipelineDefinitionStep `yaml:"steps"`
}

func parsePipeline(data []byte) *PipelineDefinition {
pipeline := PipelineDefinition{}

err := yaml.Unmarshal(data, &pipeline)
if err != nil {
log.Fatalf("error: %v", err)
}

// For compatibility with Ansible executor
r, _ := regexp.Compile("default\\('(.+)'\\)")
matches := r.FindStringSubmatch(pipeline.Bucket)
if matches != nil && matches[1] != "" {
pipeline.Bucket = matches[1]
}

return &pipeline
}
22 changes: 22 additions & 0 deletions cli/pipeline/pipeline_definition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pipeline

import (
"io/ioutil"
"testing"
)

func TestParsePipeline(t *testing.T) {
data, err := ioutil.ReadFile("test/sample_steps_passing.yml")
if err != nil {
panic(err.Error())
}
pipeline := parsePipeline(data)

if len(pipeline.Steps) != 2 {
t.Errorf("excepted two steps, got %i", len(pipeline.Steps))
}

if pipeline.Bucket != "canoe-sample-pipeline" {
t.Errorf("Expected bucket to be canoe-sample-pipeline, got %s", pipeline.Bucket)
}
}
193 changes: 193 additions & 0 deletions cli/pipeline/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright © 2017 RooFoods LTD
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline

import (
"context"
"errors"
"fmt"
"github.com/spf13/cobra"
"io/ioutil"
"k8s.io/api/core/v1"
k8errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes"
"log"
"time"
)

type runCmdFlagsStruct struct {
StepName string
BucketName string
TailLogs bool
DeletePollInterval time.Duration
}

const defaultDeletePollInterval = 2 * time.Second
const deleteTimeout = 120 * time.Second

var runCmdFlags *runCmdFlagsStruct
var clientset kubernetes.Interface

var logFatalf = log.Fatalf

var runCmd = &cobra.Command{
Use: "run [pipeline_yaml]",
Short: "Run a pipeline or a pipeline step",
Args: cobra.ExactArgs(1),
Long: `Store data into S3 under a versioned path, and update HEAD.

Example:

$ paddle pipeline run test_pipeline.yaml
`,
Run: func(cmd *cobra.Command, args []string) {
runPipeline(args[0], runCmdFlags)
},
}

func init() {
runCmdFlags = &runCmdFlagsStruct{}
runCmd.Flags().StringVarP(&runCmdFlags.StepName, "step", "s", "", "Single step to execute")
runCmd.Flags().StringVarP(&runCmdFlags.BucketName, "bucket", "b", "", "Bucket name")
runCmd.Flags().BoolVarP(&runCmdFlags.TailLogs, "logs", "l", true, "Tail logs")
runCmdFlags.DeletePollInterval = defaultDeletePollInterval

config, err := getKubernetesConfig()
if err != nil {
panic(err.Error())
}
clientset, err = kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
}

func runPipeline(path string, flags *runCmdFlagsStruct) {
data, err := ioutil.ReadFile(path)
if err != nil {
panic(err.Error())
}
pipeline := parsePipeline(data)
if flags.BucketName != "" {
pipeline.Bucket = flags.BucketName
}

for _, step := range pipeline.Steps {
if flags.StepName != "" && step.Step != flags.StepName {
continue
}
err = runPipelineStep(pipeline, &step, flags)
if err != nil {
logFatalf("[paddle] %s", err.Error())
}
}
}

func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, flags *runCmdFlagsStruct) error {
log.Printf("[paddle] Running step %s", step.Step)
podDefinition := NewPodDefinition(pipeline, step)
stepPodBuffer := podDefinition.compile()
pod := &v1.Pod{}
yaml.NewYAMLOrJSONDecoder(stepPodBuffer, 4096).Decode(pod)
pods := clientset.CoreV1().Pods(pipeline.Namespace)

err := deleteAndWait(clientset, podDefinition, flags)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

watch, err := Watch(ctx, clientset, pod)
if err != nil {
return err
}

pod, err = pods.Create(pod)
if err != nil {
return err
}

containers := make(map[string]bool)

for {
e := <-watch
switch e.Type {
case Added:
log.Printf("[paddle] Container %s/%s starting", pod.Name, e.Container)
containers[e.Container] = true
if flags.TailLogs {
TailLogs(ctx, clientset, e.Pod, e.Container)
}
case Deleted:
case Removed:
log.Printf("[paddle] Container removed: %s", e.Container)
continue
case Completed:
log.Printf("[paddle] Pod execution completed")
return nil
case Failed:
var msg string
if e.Container != "" {
if e.Message != "" {
msg = fmt.Sprintf("Container %s/%s failed: '%s'", pod.Name, e.Container, e.Message)
} else {
msg = fmt.Sprintf("Container %s/%s failed", pod.Name, e.Container)
}
_, present := containers[e.Container]
if !present && flags.TailLogs { // container died before being added
TailLogs(ctx, clientset, e.Pod, e.Container)
time.Sleep(3 * time.Second) // give it time to tail logs
}
} else {
msg = "Pod failed"
}
return errors.New(msg)
}
}

log.Printf("[paddle] Finishing pod execution")
return nil
}

func deleteAndWait(c kubernetes.Interface, podDefinition *PodDefinition, flags *runCmdFlagsStruct) error {
pods := clientset.CoreV1().Pods(podDefinition.Namespace)
deleting := false
err := wait.PollImmediate(flags.DeletePollInterval, deleteTimeout, func() (bool, error) {
var err error
err = pods.Delete(podDefinition.PodName, &metav1.DeleteOptions{})
if err != nil {
if k8errors.IsNotFound(err) {
if deleting {
log.Printf("[paddle] deleted pod %s", podDefinition.PodName)
}
return true, nil
} else {
return true, err
}
}
if deleting {
log.Print("[paddle] .")
} else {
log.Printf("[paddle] deleting pod %s", podDefinition.PodName)
deleting = true
}
return false, nil
})
return err
}
Loading