From a315186f41633f7741a4dbda634927a572ccf77a Mon Sep 17 00:00:00 2001 From: Wander Lairson Costa Date: Tue, 16 Oct 2018 18:04:32 -0300 Subject: [PATCH] Bug 1502337: Initial support for docker-worker payloads This commit implements the initial support for docker-worker payloads. So far, only the Image, Command, Env and MaxRunTime fields are supported. It hasn't been extensive tested so features like task cancellation may not work. Also, it doesn't implement custom artifacts and more extensive tests. The support for docker-worker payloads is achieved through build flags. To build with docker-worker paylod, build it with the "docker" flag, like so: $ go build -tags=docker Notice that generic-worker will not be supported in this build. We also needed to upgrade golang due to a bug in the go compiler [1]. [1] https://github.com/golang/go/issues/25908 --- .gitignore | 3 + .taskcluster.yml | 76 ++-- .travis.yml | 2 +- artifacts_test.go | 9 +- aws_test.go | 2 + chain_of_trust_all-unix-style.go | 2 +- chain_of_trust_docker.go | 11 + docker_linux.go | 46 -- dockerworker/artifact.go | 98 ++++ dockerworker/artifact_test.go | 39 ++ dockerworker/doc.go | 4 + dockerworker/dockerimage.go | 148 ++++++ dockerworker/dockerimage_test.go | 63 +++ dockerworker/dockerworker.go | 77 ++++ dockerworker/log.go | 20 + dockerworker/payload.go | 426 +++++++++++++----- dockerworker/payload.json | 307 +++++++++++++ dockerworker/renameimage.go | 155 +++++++ dockerworker/renameimage_test.go | 33 ++ dockerworker/tarstream.go | 67 +++ dockerworker/tarstream_test.go | 98 ++++ dockerworker/util.go | 24 + dockerworker/util_test.go | 126 ++++++ imageid_test.go | 9 + intermittent_task_test.go | 2 + main.go | 72 +-- main_test.go | 2 + mounts_test.go | 2 + os_groups_all-unix-style_test.go | 2 +- os_groups_windows_test.go | 2 + payload_test.go | 2 + plat_all-unix-style.go | 2 +- plat_darwin.go | 2 + plat_docker.go | 145 ++++++ plat_windows.go | 2 + process/logininfo_windows.go | 2 + ...ss_darwin.go => process_all-unix-style.go} | 2 + process/process_docker.go | 152 +++++++ process/process_docker_test.go | 52 +++ process/process_linux.go | 139 ------ process/process_windows.go | 2 + supersede_test.go | 2 + taskcluster_proxy_test.go | 2 + taskstatus_test.go | 2 + validate.go | 81 ++++ validate_docker.go | 59 +++ 46 files changed, 2167 insertions(+), 408 deletions(-) create mode 100644 chain_of_trust_docker.go delete mode 100644 docker_linux.go create mode 100644 dockerworker/artifact.go create mode 100644 dockerworker/artifact_test.go create mode 100644 dockerworker/doc.go create mode 100644 dockerworker/dockerimage.go create mode 100644 dockerworker/dockerimage_test.go create mode 100644 dockerworker/dockerworker.go create mode 100644 dockerworker/log.go create mode 100644 dockerworker/payload.json create mode 100644 dockerworker/renameimage.go create mode 100644 dockerworker/renameimage_test.go create mode 100644 dockerworker/tarstream.go create mode 100644 dockerworker/tarstream_test.go create mode 100644 dockerworker/util.go create mode 100644 dockerworker/util_test.go create mode 100644 imageid_test.go create mode 100644 plat_docker.go rename process/{process_darwin.go => process_all-unix-style.go} (99%) create mode 100644 process/process_docker.go create mode 100644 process/process_docker_test.go delete mode 100644 process/process_linux.go create mode 100644 validate.go create mode 100644 validate_docker.go diff --git a/.gitignore b/.gitignore index 6ce29864..8f466748 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,6 @@ directory-caches.json file-caches.json tasks-resolved-count.txt /revision.txt + +# binary file +generic-worker diff --git a/.taskcluster.yml b/.taskcluster.yml index 522e04d9..155d9f68 100644 --- a/.taskcluster.yml +++ b/.taskcluster.yml @@ -70,8 +70,8 @@ tasks: maxRunTime: 3600 command: - set CGO_ENABLED=0 - - set GOPATH=%CD%\gopath1.10.3 - - set GOROOT=%CD%\go1.10.3\go + - set GOPATH=%CD%\gopath1.10.4 + - set GOROOT=%CD%\go1.10.4\go - set PATH=%CD%\git\cmd;%GOPATH%\bin;%GOROOT%\bin;%PATH% - git config --global core.autocrlf false - go version @@ -111,16 +111,16 @@ tasks: - ineffassign . artifacts: - name: public/build/generic-worker-windows-amd64.exe - path: gopath1.10.3\bin\generic-worker.exe + path: gopath1.10.4\bin\generic-worker.exe expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3\src + directory: gopath1.10.4\src - content: - url: https://storage.googleapis.com/golang/go1.10.3.windows-amd64.zip - sha256: a3f19d4fc0f4b45836b349503e347e64e31ab830dedac2fc9c390836d4418edb - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.windows-amd64.zip + sha256: 5499aa98399664df8dc1da5c3aaaed14b3130b79c713b5677a0ee9e93854476c + directory: go1.10.4 format: zip - content: url: https://github.com/git-for-windows/git/releases/download/v2.14.1.windows.1/MinGit-2.14.1-64-bit.zip @@ -152,8 +152,8 @@ tasks: maxRunTime: 3600 command: - set CGO_ENABLED=0 - - set GOPATH=%CD%\gopath1.10.3 - - set GOROOT=%CD%\go1.10.3\go + - set GOPATH=%CD%\gopath1.10.4 + - set GOROOT=%CD%\go1.10.4\go - set PATH=%CD%\git\bin;%GOPATH%\bin;%GOROOT%\bin;%PATH% - git config --global core.autocrlf false - go version @@ -182,16 +182,16 @@ tasks: - ineffassign . artifacts: - name: public/build/generic-worker-windows-386.exe - path: gopath1.10.3\bin\generic-worker.exe + path: gopath1.10.4\bin\generic-worker.exe expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3\src + directory: gopath1.10.4\src - content: - url: https://storage.googleapis.com/golang/go1.10.3.windows-386.zip - sha256: 89696a29bdf808fa9861216a21824ae8eb2e750a54b1424ce7f2a177e5cd1466 - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.windows-386.zip + sha256: 407e5619048c427de4a65b26edb17d54c220f8c30ebd358961b1785a38394ec9 + directory: go1.10.4 format: zip - content: url: https://github.com/git-for-windows/git/releases/download/v2.11.0.windows.3/Git-2.11.0.3-32-bit.tar.bz2 @@ -223,8 +223,8 @@ tasks: maxRunTime: 3600 command: - set CGO_ENABLED=0 - - set GOPATH=%CD%\gopath1.10.3 - - set GOROOT=%CD%\go1.10.3\go + - set GOPATH=%CD%\gopath1.10.4 + - set GOROOT=%CD%\go1.10.4\go - set PATH=%CD%\git\cmd;%GOPATH%\bin;%GOROOT%\bin;%PATH% - git config --global core.autocrlf false - go version @@ -264,16 +264,16 @@ tasks: - ineffassign . artifacts: - name: public/build/generic-worker-windows-amd64.exe - path: gopath1.10.3\bin\generic-worker.exe + path: gopath1.10.4\bin\generic-worker.exe expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3\src + directory: gopath1.10.4\src - content: - url: https://storage.googleapis.com/golang/go1.10.3.windows-amd64.zip - sha256: a3f19d4fc0f4b45836b349503e347e64e31ab830dedac2fc9c390836d4418edb - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.windows-amd64.zip + sha256: 5499aa98399664df8dc1da5c3aaaed14b3130b79c713b5677a0ee9e93854476c + directory: go1.10.4 format: zip - content: url: https://github.com/git-for-windows/git/releases/download/v2.14.1.windows.1/MinGit-2.14.1-64-bit.zip @@ -308,8 +308,8 @@ tasks: - -vxec - | export CGO_ENABLED=0 - export GOROOT="$(pwd)/go1.10.3/go" - export GOPATH="$(pwd)/gopath1.10.3" + export GOROOT="$(pwd)/go1.10.4/go" + export GOPATH="$(pwd)/gopath1.10.4" export PATH="${GOPATH}/bin:${GOROOT}/bin:${PATH}" go version go env @@ -335,16 +335,16 @@ tasks: ineffassign . artifacts: - name: public/build/generic-worker-darwin-amd64 - path: gopath1.10.3/bin/generic-worker + path: gopath1.10.4/bin/generic-worker expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3/src + directory: gopath1.10.4/src - content: - url: https://storage.googleapis.com/golang/go1.10.3.darwin-amd64.tar.gz - sha256: 131fd430350a3134d352ee75c5ca456cdf4443e492d0527a9651c7c04e2b458d - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.darwin-amd64.tar.gz + sha256: 2ba324f01de2b2ece0376f6d696570a4c5c13db67d00aadfd612adc56feff587 + directory: go1.10.4 format: tar.gz @@ -380,8 +380,8 @@ tasks: # - -vxec # - | # export CGO_ENABLED=0 - # export GOROOT="$(pwd)/go1.10.3/go" - # export GOPATH="$(pwd)/gopath1.10.3" + # export GOROOT="$(pwd)/go1.10.4/go" + # export GOPATH="$(pwd)/gopath1.10.4" # export PATH="${GOPATH}/bin:${GOROOT}/bin:${PATH}" # export CGO_ENABLED=0 # go version @@ -407,16 +407,16 @@ tasks: # ineffassign . # artifacts: # - name: public/build/generic-worker-linux-armv6l - # path: gopath1.10.3/bin/generic-worker + # path: gopath1.10.4/bin/generic-worker # expires: "{{ '2 weeks' | $fromNow }}" # type: file # mounts: # - cacheName: generic-worker-checkout - # directory: gopath1.10.3/src + # directory: gopath1.10.4/src # - content: - # url: https://storage.googleapis.com/golang/go1.10.3.linux-armv6l.tar.gz + # url: https://storage.googleapis.com/golang/go1.10.4.linux-armv6l.tar.gz # sha256: d3df3fa3d153e81041af24f31a82f86a21cb7b92c1b5552fb621bad0320f06b6 - # directory: go1.10.3 + # directory: go1.10.4 # format: tar.gz @@ -472,13 +472,13 @@ tasks: "${GOPATH}/bin/ineffassign" . artifacts: - name: public/build/generic-worker-linux-amd64 - path: gopath1.10.1/bin/generic-worker + path: gopath1.10.4/bin/generic-worker expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.1/src + directory: gopath1.10.4/src - content: - url: https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz - directory: go1.10.1 + url: https://storage.googleapis.com/golang/go1.10.4.linux-amd64.tar.gz + directory: go1.10.4 format: tar.gz diff --git a/.travis.yml b/.travis.yml index 656cca7c..b659a8b9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go go: - - "1.10.3" + - "1.10.4" env: - "CGO_ENABLED=0 GIMME_OS=linux GIMME_ARCH=386" diff --git a/artifacts_test.go b/artifacts_test.go index 0dd27ee5..7331d4b1 100644 --- a/artifacts_test.go +++ b/artifacts_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( @@ -13,17 +15,10 @@ import ( "golang.org/x/crypto/openpgp" "golang.org/x/crypto/openpgp/clearsign" - "github.com/taskcluster/slugid-go/slugid" tcclient "github.com/taskcluster/taskcluster-client-go" "github.com/taskcluster/taskcluster-client-go/tcqueue" ) -var ( - // all tests can share taskGroupId so we can view all test tasks in same - // graph later for troubleshooting - taskGroupID = slugid.Nice() -) - func validateArtifacts( t *testing.T, payloadArtifacts []Artifact, diff --git a/aws_test.go b/aws_test.go index 4fb21e8a..828892ee 100644 --- a/aws_test.go +++ b/aws_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/chain_of_trust_all-unix-style.go b/chain_of_trust_all-unix-style.go index ee371da3..8a5fa376 100644 --- a/chain_of_trust_all-unix-style.go +++ b/chain_of_trust_all-unix-style.go @@ -1,4 +1,4 @@ -// +build !windows +// +build !windows,!docker package main diff --git a/chain_of_trust_docker.go b/chain_of_trust_docker.go new file mode 100644 index 00000000..c0c20da7 --- /dev/null +++ b/chain_of_trust_docker.go @@ -0,0 +1,11 @@ +// +build docker + +package main + +func (cot *ChainOfTrustTaskFeature) ensureTaskUserCantReadPrivateCotKey() error { + return nil +} + +func secureSigningKey() error { + return nil +} diff --git a/docker_linux.go b/docker_linux.go deleted file mode 100644 index b206cc2c..00000000 --- a/docker_linux.go +++ /dev/null @@ -1,46 +0,0 @@ -package main - -import ( - "github.com/taskcluster/taskcluster-base-go/scopes" -) - -type DockerFeature struct { -} - -func (feature *DockerFeature) Name() string { - return "Docker" -} - -func (feature *DockerFeature) Initialise() error { - return nil -} - -func (feature *DockerFeature) PersistState() error { - return nil -} - -func (feature *DockerFeature) IsEnabled(task *TaskRun) bool { - return true -} - -type DockerTask struct { - task *TaskRun -} - -func (feature *DockerFeature) NewTaskFeature(task *TaskRun) TaskFeature { - return &DockerTask{ - task: task, - } -} - -func (l *DockerTask) RequiredScopes() scopes.Required { - return scopes.Required{} -} - -func (l *DockerTask) Start() *CommandExecutionError { - return nil -} - -func (l *DockerTask) Stop() *CommandExecutionError { - return nil -} diff --git a/dockerworker/artifact.go b/dockerworker/artifact.go new file mode 100644 index 00000000..0f25e27c --- /dev/null +++ b/dockerworker/artifact.go @@ -0,0 +1,98 @@ +// +build docker + +package dockerworker + +import ( + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "time" + + "github.com/cenkalti/backoff" + "github.com/mattetti/filebuffer" + "github.com/mitchellh/ioprogress" +) + +// DownloadArtifact downloads an artifact using exponential backoff algorithm +func (d *DockerWorker) DownloadArtifact(taskID, runID, name string, out io.WriteSeeker) (ret error) { + var u *url.URL + + backoffError := backoff.Retry(func() (err error) { + // rewind out stream + _, ret = out.Seek(0, io.SeekStart) + if ret != nil { + return + } + + if runID == "" { + u, ret = d.Queue.GetLatestArtifact_SignedURL(taskID, name, 24*time.Hour) + } else { + u, ret = d.Queue.GetArtifact_SignedURL(taskID, runID, name, 24*time.Hour) + } + + if ret != nil { + return + } + + d.TaskLogger.Printf("Downloading %s/%s/%s from %s\n", taskID, runID, name, u.String()) + + // Build a custom request object so we can embed a context into it + var req *http.Request + req, ret = http.NewRequest(http.MethodGet, u.String(), nil) + if ret != nil { + return + } + req = req.WithContext(d.Context) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + fmt.Println(err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + buff := filebuffer.New([]byte{}) + + // Build the error message with the status code and the response body + errorMessage := fmt.Sprintf("Error downloading %s/%s, status=%s", taskID, name, resp.Status) + if _, err2 := io.Copy(buff, resp.Body); err2 == nil { + errorMessage += "\n" + buff.String() + } + + // For status codes other than 5XX there is no point in issuing a retry + if resp.StatusCode >= 300 && resp.StatusCode < 500 { + ret = errors.New(errorMessage) + } else { + err = errors.New(errorMessage) + } + + return + } + + size, err2 := strconv.Atoi(resp.Header.Get("Content-Length")) + if err2 != nil { + ret = err2 + return + } + + // Depending on the implementation, the body can also issue network requests, + // that's why we need to retry if io.Copy fails + _, err = io.Copy(out, &ioprogress.Reader{ + Reader: resp.Body, + Size: int64(size), + DrawFunc: ioprogress.DrawTerminal(d.LivelogWriter), + }) + + return + }, backoff.WithMaxRetries(backoff.WithContext(backoff.NewExponentialBackOff(), d.Context), 3)) + + if ret == nil { + ret = backoffError + } + + return +} diff --git a/dockerworker/artifact_test.go b/dockerworker/artifact_test.go new file mode 100644 index 00000000..41ecf9f7 --- /dev/null +++ b/dockerworker/artifact_test.go @@ -0,0 +1,39 @@ +// +build docker + +package dockerworker + +import ( + "io" + "testing" + + "github.com/mattetti/filebuffer" + "github.com/stretchr/testify/require" +) + +func TestArtifact(t *testing.T) { + d := NewTestDockerWorker(t) + + content := "This is a dummy artifact" + + taskID, err := createDummyTask(d) + require.NoError(t, err) + defer d.Queue.ReportCompleted(taskID, "0") + + require.NoError(t, createDummyArtifact(d, taskID, "public/test", content)) + + // Test Download Latest + buff := filebuffer.New([]byte{}) + require.NoError(t, d.DownloadArtifact(taskID, "", "public/test", buff)) + + _, err = buff.Seek(0, io.SeekStart) + require.NoError(t, err) + require.Equal(t, content, buff.String()) + + // Test Download specific runID + buff = filebuffer.New([]byte{}) + require.NoError(t, d.DownloadArtifact(taskID, "0", "public/test", buff)) + + _, err = buff.Seek(0, io.SeekStart) + require.NoError(t, err) + require.Equal(t, content, buff.String()) +} diff --git a/dockerworker/doc.go b/dockerworker/doc.go new file mode 100644 index 00000000..a02af279 --- /dev/null +++ b/dockerworker/doc.go @@ -0,0 +1,4 @@ +// Package dockerworker implements utility functions to support the dockerworker +// engine. The dockerworker engines aims to implement the docker-worker payloads +// with full backward support for (non-deprecated) features. +package dockerworker diff --git a/dockerworker/dockerimage.go b/dockerworker/dockerimage.go new file mode 100644 index 00000000..9fa58277 --- /dev/null +++ b/dockerworker/dockerimage.go @@ -0,0 +1,148 @@ +// +build docker + +package dockerworker + +import ( + "context" + "encoding/base64" + "fmt" + "io" + "io/ioutil" + "os" + "strings" + + "github.com/DataDog/zstd" + "github.com/cenkalti/backoff" + docker "github.com/fsouza/go-dockerclient" + "github.com/pierrec/lz4" + "github.com/taskcluster/taskcluster-client-go/tcindex" +) + +func MakeImageName(taskID, runID, artifactName string) string { + return strings.ToLower(base64.RawStdEncoding.EncodeToString([]byte(taskID + runID + artifactName))) +} + +// Ensure image pulls the given image if it doesn't exist +func EnsureImage(ctx context.Context, cli *docker.Client, imageName string, log io.Writer) (img *docker.Image, err error) { + if img, err = cli.InspectImage(imageName); err == nil { + return + } + + err = backoff.Retry(func() error { + return cli.PullImage(docker.PullImageOptions{ + Repository: imageName, + OutputStream: log, + Context: ctx, + }, docker.AuthConfiguration{}) + }, backoff.WithMaxRetries(backoff.WithContext(backoff.NewExponentialBackOff(), ctx), 3)) + + if err != nil { + return + } + + return cli.InspectImage(imageName) +} + +// LoadArtifactImage loads an image from a task artifact +func (d *DockerWorker) LoadArtifactImage(taskID, runID, name string) (img *docker.Image, err error) { + if runID != "" { + d.TaskLogger.Printf("Loading image %s/%s/%s", taskID, runID, name) + } else { + d.TaskLogger.Printf("Loading image %s/%s", taskID, name) + } + + imageName := MakeImageName(taskID, runID, name) + + // If we already downloaded the image, just return it + img, err = d.Client.InspectImage(imageName) + if err == nil { + return + } + + // The downloaded image is written to a file because it might be + // a huge image + f, err := ioutil.TempFile(os.TempDir(), "image") + if err != nil { + return + } + + defer os.Remove(f.Name()) + defer f.Close() + + if err = d.DownloadArtifact(taskID, runID, name, f); err != nil { + return + } + + // rewind the file pointer to read the downloaded file + if _, err = f.Seek(0, io.SeekStart); err != nil { + return + } + + var r io.Reader + + // We accept the image in three formats: .tar, .tar.lz4 and .tar.zst + switch { + case strings.HasSuffix(name, ".zst"): + rc := zstd.NewReader(f) + defer rc.Close() + r = rc + case strings.HasSuffix(name, ".lz4"): + r = lz4.NewReader(f) + case strings.HasSuffix(name, ".tar"): + r = f + default: + err = fmt.Errorf("Not supported format for image artifact %s", name) + return + } + + // We need to rename the image name to avoid cache poisoning. + // Again we store it in a temporary file to avoid exhausting + // the memory + t, err := ioutil.TempFile(os.TempDir(), "image.tar") + if err != nil { + return + } + + defer os.Remove(t.Name()) + defer t.Close() + + // As we cache images, we need to give it a unique name, + // other wise an attacker can use a cache poisoning attack + // to make a task load malicious image. Ref: Bug 1389719 + err = renameDockerImageTarStream(imageName, r, t) + if err != nil { + return + } + + if _, err = t.Seek(0, io.SeekStart); err != nil { + return + } + + d.Client.LoadImage(docker.LoadImageOptions{ + Context: d.Context, + InputStream: t, + OutputStream: d.LivelogWriter, + }) + + img, err = d.Client.InspectImage(imageName) + + return +} + +func (d *DockerWorker) LoadIndexedImage(namespace, path string) (*docker.Image, error) { + d.TaskLogger.Printf("Loading image %s:%s", namespace, path) + + index := tcindex.NewFromEnv() + + task, err := index.FindTask(namespace) + if err != nil { + return nil, err + } + + return d.LoadArtifactImage(task.TaskID, "", path) +} + +func (d *DockerWorker) LoadImage(name string) (*docker.Image, error) { + d.TaskLogger.Printf("Loading image %s", name) + return EnsureImage(d.Context, d.Client, name, d.LivelogWriter) +} diff --git a/dockerworker/dockerimage_test.go b/dockerworker/dockerimage_test.go new file mode 100644 index 00000000..8d3b22e6 --- /dev/null +++ b/dockerworker/dockerimage_test.go @@ -0,0 +1,63 @@ +// +build docker + +package dockerworker + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +const ( + // imageTaskID is the task containing the sample artifacts images for test + imageTaskID = "Xx0aPfyOTU2o_0FZnr_AJg" + testNamespace = "garbage.docker-worker-tests.docker-images" + exampleImage = "coreos/example:1.0.0" +) + +func loadArtifactImageTest(t *testing.T, artifactName string) { + d := NewTestDockerWorker(t) + + imageName := MakeImageName(imageTaskID, "", artifactName) + + img, err := d.LoadArtifactImage(imageTaskID, "", artifactName) + require.NoError(t, err) + defer d.Client.RemoveImage(img.ID) + + loadedImage, err := d.Client.InspectImage(imageName) + require.NoError(t, err) + require.Equal(t, loadedImage.ID, img.ID) +} + +func TestLoadArtifactImageTar(t *testing.T) { + loadArtifactImageTest(t, "public/image.tar") +} + +func TestLoadArtifactImageZst(t *testing.T) { + loadArtifactImageTest(t, "public/image.tar.zst") +} + +func TestLoadArtifactImageLz4(t *testing.T) { + loadArtifactImageTest(t, "public/image.tar.lz4") +} + +func TestLoadIndexedImage(t *testing.T) { + d := NewTestDockerWorker(t) + + img, err := d.LoadIndexedImage(testNamespace, "public/image.tar") + require.NoError(t, err) + + require.NoError(t, d.Client.RemoveImage(img.ID)) +} + +func TestLoadImage(t *testing.T) { + d := NewTestDockerWorker(t) + + img, err := d.LoadImage(exampleImage) + require.NoError(t, err) + defer d.Client.RemoveImage(img.ID) + + img2, err := d.Client.InspectImage(exampleImage) + require.NoError(t, err) + require.Equal(t, img.ID, img2.ID) +} diff --git a/dockerworker/dockerworker.go b/dockerworker/dockerworker.go new file mode 100644 index 00000000..14b5a2ce --- /dev/null +++ b/dockerworker/dockerworker.go @@ -0,0 +1,77 @@ +// +build docker + +package dockerworker + +import ( + "context" + "encoding/json" + "io" + "log" + + docker "github.com/fsouza/go-dockerclient" + "github.com/taskcluster/taskcluster-client-go/tcqueue" + "github.com/xeipuuv/gojsonschema" +) + +var cli *docker.Client + +func init() { + var err error + if cli, err = docker.NewClientFromEnv(); err != nil { + panic(err) + } +} + +type DockerWorker struct { + LivelogWriter io.Writer + Logger *log.Logger + TaskLogger *log.Logger + Context context.Context + Queue *tcqueue.Queue + Client *docker.Client +} + +func New(ctx context.Context, queue *tcqueue.Queue, taskID string, liveLogWriter io.Writer) *DockerWorker { + return &DockerWorker{ + LivelogWriter: liveLogWriter, + Logger: NewLogger(taskID), + TaskLogger: NewTaskLogger(liveLogWriter), + Context: ctx, + Queue: queue, + Client: cli, + } +} + +// ValidatePayload validates the docker worker task payload +func ValidatePayload(payload json.RawMessage) (result *gojsonschema.Result, err error) { + schemaLoader := gojsonschema.NewStringLoader(taskPayloadSchema()) + docLoader := gojsonschema.NewStringLoader(string(payload)) + return gojsonschema.Validate(schemaLoader, docLoader) +} + +// CreateContainer creates a new docker container to run a task +func (d *DockerWorker) CreateContainer(env []string, image *docker.Image, command []string, privileged bool) (container *docker.Container, err error) { + return d.Client.CreateContainer(docker.CreateContainerOptions{ + Config: &docker.Config{ + Image: image.ID, + Cmd: command, + Hostname: "", + User: "", + AttachStdin: false, + AttachStdout: true, + AttachStderr: true, + Tty: true, + OpenStdin: false, + StdinOnce: false, + Env: env, + }, + HostConfig: &docker.HostConfig{ + Privileged: privileged, + ShmSize: 1800000000, + ExtraHosts: []string{ + "localhost.localdomain:127.0.0.1", // Bug 1488148 + }, + }, + Context: d.Context, + }) +} diff --git a/dockerworker/log.go b/dockerworker/log.go new file mode 100644 index 00000000..f7a5bd9c --- /dev/null +++ b/dockerworker/log.go @@ -0,0 +1,20 @@ +// +build docker + +package dockerworker + +import ( + "fmt" + "io" + "log" + "os" +) + +// NewLogger returns a new Logger that will write to standard output +func NewLogger(taskID string) *log.Logger { + return log.New(os.Stdout, fmt.Sprintf("[generic-docker-worker taskID=\"%s\"] ", taskID), 0) +} + +// NewTaskLogger returns a logger that will writer to the task log +func NewTaskLogger(writer io.Writer) *log.Logger { + return log.New(writer, "", 0) +} diff --git a/dockerworker/payload.go b/dockerworker/payload.go index b6fd7e26..26b0f270 100644 --- a/dockerworker/payload.go +++ b/dockerworker/payload.go @@ -1,3 +1,4 @@ +// +build docker // This source code file is AUTO-GENERATED by github.com/taskcluster/jsonschema2go package dockerworker @@ -9,101 +10,65 @@ import ( ) type ( - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/definitions/artifact Artifact struct { - - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/definitions/artifact/properties/expires Expires tcclient.Time `json:"expires,omitempty"` - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/definitions/artifact/properties/path Path string `json:"path"` // Possible values: // * "file" // * "directory" - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/definitions/artifact/properties/type Type string `json:"type"` } // Set of capabilities that must be enabled or made available to the task container Example: ```{ "capabilities": { "privileged": true }``` - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/capabilities Capabilities struct { // Allows devices from the host system to be attached to a task container similar to using `--device` in docker. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/capabilities/properties/devices Devices Devices `json:"devices,omitempty"` // Allows a task to run in a privileged container, similar to running docker with `--privileged`. This only works for worker-types configured to enable it. // // Default: false - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/capabilities/properties/privileged Privileged bool `json:"privileged,omitempty"` } // Allows devices from the host system to be attached to a task container similar to using `--device` in docker. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/capabilities/properties/devices Devices struct { // Audio loopback device created using snd-aloop - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/capabilities/properties/devices/properties/loopbackAudio LoopbackAudio bool `json:"loopbackAudio,omitempty"` // Video loopback device created using v4l2loopback. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/capabilities/properties/devices/properties/loopbackVideo LoopbackVideo bool `json:"loopbackVideo,omitempty"` } // Image to use for the task. Images can be specified as an image tag as used by a docker registry, or as an object declaring type and name/namespace - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[3] DockerImageArtifact struct { - - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[3]/properties/path Path string `json:"path"` - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[3]/properties/taskId TaskID string `json:"taskId"` // Possible values: // * "task-image" - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[3]/properties/type Type string `json:"type"` } // Image to use for the task. Images can be specified as an image tag as used by a docker registry, or as an object declaring type and name/namespace - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[0] DockerImageName string // `.payload` field of the queue. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json# DockerWorkerPayload struct { // Artifact upload map example: ```{"public/build.tar.gz": {"path": "/home/worker/build.tar.gz", "expires": "2016-05-28T16:12:56.693817Z", "type": "file"}}``` - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/artifacts Artifacts map[string]Artifact `json:"artifacts,omitempty"` // Caches are mounted within the docker container at the mount point specified. Example: ```{ "CACHE NAME": "/mount/path/in/container" }``` // // Map entries: - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/cache/additionalProperties - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/cache Cache map[string]string `json:"cache,omitempty"` // Set of capabilities that must be enabled or made available to the task container Example: ```{ "capabilities": { "privileged": true }``` - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/capabilities Capabilities Capabilities `json:"capabilities,omitempty"` // Example: `['/bin/bash', '-c', 'ls']`. @@ -111,17 +76,11 @@ type ( // Default: [] // // Array items: - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/command/items - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/command Command []string `json:"command,omitempty"` // List of base64 encoded asymmetric encrypted environment variables. See /docs/reference/workers/docker-worker/environment#encrypted-environment-variables // // Array items: - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/encryptedEnv/items - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/encryptedEnv EncryptedEnv []string `json:"encryptedEnv,omitempty"` // Example: ``` @@ -132,14 +91,9 @@ type ( // ``` // // Map entries: - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/env/additionalProperties - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/env Env map[string]string `json:"env,omitempty"` // Used to enable additional functionality. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features Features FeatureFlags `json:"features,omitempty"` // Image to use for the task. Images can be specified as an image tag as used by a docker registry, or as an object declaring type and name/namespace @@ -149,150 +103,416 @@ type ( // * NamedDockerImage // * IndexedDockerImage // * DockerImageArtifact - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image Image json.RawMessage `json:"image"` // Specifies a custom location for the livelog artifact - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/log Log string `json:"log,omitempty"` // Maximum time the task container can run in seconds // // Mininum: 1 // Maximum: 86400 - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/maxRunTime MaxRunTime float64 `json:"maxRunTime"` // By default docker-worker will fail a task with a non-zero exit status without retrying. This payload property allows a task owner to define certain exit statuses that will be marked as a retriable exception. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/onExitStatus OnExitStatus ExitStatusHandling `json:"onExitStatus,omitempty"` // Syntax: ^https?://[\x20-\x7e]*$ - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/supersederUrl SupersederURL string `json:"supersederUrl,omitempty"` } // By default docker-worker will fail a task with a non-zero exit status without retrying. This payload property allows a task owner to define certain exit statuses that will be marked as a retriable exception. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/onExitStatus ExitStatusHandling struct { // If the task exists with a purge caches exit status, all caches associated with the task will be purged. // // Array items: - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/onExitStatus/properties/purgeCaches/items - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/onExitStatus/properties/purgeCaches PurgeCaches []float64 `json:"purgeCaches,omitempty"` // If the task exists with a retriable exit status, the task will be marked as an exception and a new run created. // // Array items: - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/onExitStatus/properties/retry/items - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/onExitStatus/properties/retry Retry []float64 `json:"retry,omitempty"` } // Used to enable additional functionality. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features FeatureFlags struct { // This allows you to use the Linux ptrace functionality inside the container; it is otherwise disallowed by Docker's security policy. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/allowPtrace AllowPtrace bool `json:"allowPtrace,omitempty"` - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/artifacts Artifacts bool `json:"artifacts,omitempty"` // The Balrog stage proxy feature allows tasks to make requests to http://balrog which is a proxied connection through a vpn tunnel to the stage balrog update server. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/balrogStageVPNProxy BalrogStageVPNProxy bool `json:"balrogStageVPNProxy,omitempty"` // The Balrog proxy feature allows tasks to make requests to http://balrog which is a proxied connection through a vpn tunnel to production balrog update server. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/balrogVPNProxy BalrogVPNProxy bool `json:"balrogVPNProxy,omitempty"` // Useful if live logging is not interesting but the overalllog is later on - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/bulkLog BulkLog bool `json:"bulkLog,omitempty"` // An artifact named chainOfTrust.json.asc should be generated which will include information for downstream tasks to build a level of trust for the artifacts produced by the task and the environment it ran in. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/chainOfTrust ChainOfTrust bool `json:"chainOfTrust,omitempty"` // Runs docker-in-docker and binds `/var/run/docker.sock` into the container. Doesn't allow privileged mode, capabilities or host volume mounts. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/dind Dind bool `json:"dind,omitempty"` // Uploads docker images as artifacts - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/dockerSave DockerSave bool `json:"dockerSave,omitempty"` // This allows you to interactively run commands inside the container and attaches you to the stdin/stdout/stderr over a websocket. Can be used for SSH-like access to docker containers. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/interactive Interactive bool `json:"interactive,omitempty"` // Logs are stored on the worker during the duration of tasks and available via http chunked streaming then uploaded to s3 - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/localLiveLog LocalLiveLog bool `json:"localLiveLog,omitempty"` // The Releng API proxy service allows tasks to talk to releng api using an authorization token based on the task's scopes - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/relengAPIProxy RelengAPIProxy bool `json:"relengAPIProxy,omitempty"` // The auth proxy allows making requests to taskcluster/queue and taskcluster/scheduler directly from your task with the same scopes as set in the task. This can be used to make api calls via the [client](https://github.com/taskcluster/taskcluster-client) CURL, etc... Without embedding credentials in the task. - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/features/properties/taskclusterProxy TaskclusterProxy bool `json:"taskclusterProxy,omitempty"` } // Image to use for the task. Images can be specified as an image tag as used by a docker registry, or as an object declaring type and name/namespace - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[2] IndexedDockerImage struct { - - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[2]/properties/namespace Namespace string `json:"namespace"` - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[2]/properties/path Path string `json:"path"` // Possible values: // * "indexed-image" - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[2]/properties/type Type string `json:"type"` } // Image to use for the task. Images can be specified as an image tag as used by a docker registry, or as an object declaring type and name/namespace - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[1] NamedDockerImage struct { - - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[1]/properties/name Name string `json:"name"` // Possible values: // * "docker-image" - // - // See https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json#/properties/image/oneOf[1]/properties/type Type string `json:"type"` } ) + +// Returns json schema for the payload part of the task definition. Please +// note we use a go string and do not load an external file, since we want this +// to be *part of the compiled executable*. If this sat in another file that +// was loaded at runtime, it would not be burned into the build, which would be +// bad for the following two reasons: +// 1) we could no longer distribute a single binary file that didn't require +// installation/extraction +// 2) the payload schema is specific to the version of the code, therefore +// should be versioned directly with the code and *frozen on build*. +// +// Run `generic-worker show-payload-schema` to output this schema to standard +// out. +func taskPayloadSchema() string { + return `{ + "$schema": "http://json-schema.org/draft-06/schema#", + "additionalProperties": false, + "definitions": { + "artifact": { + "additionalProperties": false, + "properties": { + "expires": { + "format": "date-time", + "title": "Date when artifact should expire must be in the future.", + "type": "string" + }, + "path": { + "title": "Location of artifact in container.", + "type": "string" + }, + "type": { + "enum": [ + "file", + "directory" + ], + "title": "Artifact upload type.", + "type": "string" + } + }, + "required": [ + "type", + "path" + ], + "type": "object" + } + }, + "description": "` + "`" + `.payload` + "`" + ` field of the queue.", + "properties": { + "artifacts": { + "additionalProperties": { + "$ref": "#/definitions/artifact" + }, + "description": "Artifact upload map example: ` + "`" + `` + "`" + `` + "`" + `{\"public/build.tar.gz\": {\"path\": \"/home/worker/build.tar.gz\", \"expires\": \"2016-05-28T16:12:56.693817Z\", \"type\": \"file\"}}` + "`" + `` + "`" + `` + "`" + `", + "title": "Artifacts", + "type": "object" + }, + "cache": { + "additionalProperties": { + "type": "string" + }, + "description": "Caches are mounted within the docker container at the mount point specified. Example: ` + "`" + `` + "`" + `` + "`" + `{ \"CACHE NAME\": \"/mount/path/in/container\" }` + "`" + `` + "`" + `` + "`" + `", + "title": "Caches to mount point mapping.", + "type": "object" + }, + "capabilities": { + "additionalProperties": false, + "description": "Set of capabilities that must be enabled or made available to the task container Example: ` + "`" + `` + "`" + `` + "`" + `{ \"capabilities\": { \"privileged\": true }` + "`" + `` + "`" + `` + "`" + `", + "properties": { + "devices": { + "additionalProperties": false, + "description": "Allows devices from the host system to be attached to a task container similar to using ` + "`" + `--device` + "`" + ` in docker. ", + "properties": { + "loopbackAudio": { + "description": "Audio loopback device created using snd-aloop", + "title": "Loopback Audio device", + "type": "boolean" + }, + "loopbackVideo": { + "description": "Video loopback device created using v4l2loopback.", + "title": "Loopback Video device", + "type": "boolean" + } + }, + "title": "Devices to be attached to task containers", + "type": "object" + }, + "privileged": { + "default": false, + "description": "Allows a task to run in a privileged container, similar to running docker with ` + "`" + `--privileged` + "`" + `. This only works for worker-types configured to enable it.", + "title": "Privileged container", + "type": "boolean" + } + }, + "title": "Capabilities that must be available/enabled for the task container.", + "type": "object" + }, + "command": { + "default": [], + "description": "Example: ` + "`" + `['/bin/bash', '-c', 'ls']` + "`" + `.", + "items": { + "type": "string" + }, + "title": "Docker command to run (see docker api).", + "type": "array" + }, + "encryptedEnv": { + "description": "List of base64 encoded asymmetric encrypted environment variables. See /docs/reference/workers/docker-worker/environment#encrypted-environment-variables", + "items": { + "title": "Base64 encoded encrypted environment variable object.", + "type": "string" + }, + "title": "List of encrypted environment variable mappings.", + "type": "array" + }, + "env": { + "additionalProperties": { + "type": "string" + }, + "description": "Example: ` + "`" + `` + "`" + `` + "`" + `\n{\n \"PATH\": '/borked/path' \n \"ENV_NAME\": \"VALUE\" \n}\n` + "`" + `` + "`" + `` + "`" + `", + "title": "Environment variable mappings.", + "type": "object" + }, + "features": { + "additionalProperties": false, + "description": "Used to enable additional functionality.", + "properties": { + "allowPtrace": { + "description": "This allows you to use the Linux ptrace functionality inside the container; it is otherwise disallowed by Docker's security policy. ", + "title": "Allow ptrace within the container", + "type": "boolean" + }, + "artifacts": { + "description": "", + "title": "Artifact uploads", + "type": "boolean" + }, + "balrogStageVPNProxy": { + "description": "The Balrog stage proxy feature allows tasks to make requests to http://balrog which is a proxied connection through a vpn tunnel to the stage balrog update server.", + "title": "Balrog stage proxy service", + "type": "boolean" + }, + "balrogVPNProxy": { + "description": "The Balrog proxy feature allows tasks to make requests to http://balrog which is a proxied connection through a vpn tunnel to production balrog update server.", + "title": "Balrog proxy service", + "type": "boolean" + }, + "bulkLog": { + "description": "Useful if live logging is not interesting but the overalllog is later on", + "title": "Bulk upload the task log into a single artifact", + "type": "boolean" + }, + "chainOfTrust": { + "description": "An artifact named chainOfTrust.json.asc should be generated which will include information for downstream tasks to build a level of trust for the artifacts produced by the task and the environment it ran in.", + "title": "Enable generation of a openpgp signed Chain of Trust artifact", + "type": "boolean" + }, + "dind": { + "description": "Runs docker-in-docker and binds ` + "`" + `/var/run/docker.sock` + "`" + ` into the container. Doesn't allow privileged mode, capabilities or host volume mounts.", + "title": "Docker in Docker", + "type": "boolean" + }, + "dockerSave": { + "description": "Uploads docker images as artifacts", + "title": "Docker save", + "type": "boolean" + }, + "interactive": { + "description": "This allows you to interactively run commands inside the container and attaches you to the stdin/stdout/stderr over a websocket. Can be used for SSH-like access to docker containers.", + "title": "Docker Exec Interactive", + "type": "boolean" + }, + "localLiveLog": { + "description": "Logs are stored on the worker during the duration of tasks and available via http chunked streaming then uploaded to s3", + "title": "Enable live logging (worker local)", + "type": "boolean" + }, + "relengAPIProxy": { + "description": "The Releng API proxy service allows tasks to talk to releng api using an authorization token based on the task's scopes", + "title": "Releng API proxy service", + "type": "boolean" + }, + "taskclusterProxy": { + "description": "The auth proxy allows making requests to taskcluster/queue and taskcluster/scheduler directly from your task with the same scopes as set in the task. This can be used to make api calls via the [client](https://github.com/taskcluster/taskcluster-client) CURL, etc... Without embedding credentials in the task.", + "title": "Task cluster auth proxy service", + "type": "boolean" + } + }, + "title": "Feature flags", + "type": "object" + }, + "image": { + "description": "Image to use for the task. Images can be specified as an image tag as used by a docker registry, or as an object declaring type and name/namespace", + "oneOf": [ + { + "title": "Docker image name", + "type": "string" + }, + { + "additionalProperties": false, + "properties": { + "name": { + "type": "string" + }, + "type": { + "enum": [ + "docker-image" + ], + "type": "string" + } + }, + "required": [ + "type", + "name" + ], + "title": "Named docker image", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "namespace": { + "type": "string" + }, + "path": { + "type": "string" + }, + "type": { + "enum": [ + "indexed-image" + ], + "type": "string" + } + }, + "required": [ + "type", + "namespace", + "path" + ], + "title": "Indexed docker image", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "path": { + "type": "string" + }, + "taskId": { + "type": "string" + }, + "type": { + "enum": [ + "task-image" + ], + "type": "string" + } + }, + "required": [ + "type", + "taskId", + "path" + ], + "title": "Docker image artifact", + "type": "object" + } + ], + "title": "Docker image." + }, + "log": { + "description": "Specifies a custom location for the livelog artifact", + "title": "Custom log location", + "type": "string" + }, + "maxRunTime": { + "description": "Maximum time the task container can run in seconds", + "maximum": 86400, + "minimum": 1, + "multipleOf": 1, + "title": "Maximum run time in seconds", + "type": "number" + }, + "onExitStatus": { + "additionalProperties": false, + "description": "By default docker-worker will fail a task with a non-zero exit status without retrying. This payload property allows a task owner to define certain exit statuses that will be marked as a retriable exception.", + "properties": { + "purgeCaches": { + "description": "If the task exists with a purge caches exit status, all caches associated with the task will be purged.", + "items": { + "title": "Exit statuses", + "type": "number" + }, + "title": "Purge caches exit status", + "type": "array" + }, + "retry": { + "description": "If the task exists with a retriable exit status, the task will be marked as an exception and a new run created.", + "items": { + "title": "Exit statuses", + "type": "number" + }, + "title": "Retriable exit statuses", + "type": "array" + } + }, + "title": "Exit status handling", + "type": "object" + }, + "supersederUrl": { + "format": "uri", + "pattern": "^https?://[\\x20-\\x7e]*$", + "title": "URL of the a service that can indicate tasks superseding this one; the current taskId will be appended as a query argument ` + "`" + `taskId` + "`" + `. The service should return an object with a ` + "`" + `supersedes` + "`" + ` key containing a list of taskIds, including the supplied taskId. The tasks should be ordered such that each task supersedes all tasks appearing earlier in the list. See [superseding](/docs/reference/platform/taskcluster-queue/docs/superseding) for more detail.", + "type": "string" + } + }, + "required": [ + "image", + "maxRunTime" + ], + "title": "Docker worker payload", + "type": "object" +}` +} diff --git a/dockerworker/payload.json b/dockerworker/payload.json new file mode 100644 index 00000000..2e41d9b6 --- /dev/null +++ b/dockerworker/payload.json @@ -0,0 +1,307 @@ +{ + "$schema": "http://json-schema.org/draft-06/schema#", + "definitions": { + "artifact": { + "type": "object", + "additionalProperties": false, + "properties": { + "type": { + "title": "Artifact upload type.", + "type": "string", + "enum": [ + "file", + "directory" + ] + }, + "path": { + "title": "Location of artifact in container.", + "type": "string" + }, + "expires": { + "title": "Date when artifact should expire must be in the future.", + "type": "string", + "format": "date-time" + } + }, + "required": [ + "type", + "path" + ] + } + }, + "title": "Docker worker payload", + "description": "`.payload` field of the queue.", + "type": "object", + "required": [ + "image", + "maxRunTime" + ], + "additionalProperties": false, + "properties": { + "log": { + "title": "Custom log location", + "description": "Specifies a custom location for the livelog artifact", + "type": "string" + }, + "image": { + "title": "Docker image.", + "description": "Image to use for the task. Images can be specified as an image tag as used by a docker registry, or as an object declaring type and name/namespace", + "oneOf": [ + { + "title": "Docker image name", + "type": "string" + }, + { + "type": "object", + "title": "Named docker image", + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "enum": [ + "docker-image" + ] + }, + "name": { + "type": "string" + } + }, + "required": [ + "type", + "name" + ] + }, + { + "type": "object", + "title": "Indexed docker image", + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "enum": [ + "indexed-image" + ] + }, + "namespace": { + "type": "string" + }, + "path": { + "type": "string" + } + }, + "required": [ + "type", + "namespace", + "path" + ] + }, + { + "type": "object", + "title": "Docker image artifact", + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "enum": [ + "task-image" + ] + }, + "taskId": { + "type": "string" + }, + "path": { + "type": "string" + } + }, + "required": [ + "type", + "taskId", + "path" + ] + } + ] + }, + "cache": { + "title": "Caches to mount point mapping.", + "description": "Caches are mounted within the docker container at the mount point specified. Example: ```{ \"CACHE NAME\": \"/mount/path/in/container\" }```", + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "capabilities": { + "title": "Capabilities that must be available/enabled for the task container.", + "description": "Set of capabilities that must be enabled or made available to the task container Example: ```{ \"capabilities\": { \"privileged\": true }```", + "type": "object", + "additionalProperties": false, + "properties": { + "privileged": { + "title": "Privileged container", + "description": "Allows a task to run in a privileged container, similar to running docker with `--privileged`. This only works for worker-types configured to enable it.", + "type": "boolean", + "default": false + }, + "devices": { + "title": "Devices to be attached to task containers", + "description": "Allows devices from the host system to be attached to a task container similar to using `--device` in docker. ", + "type": "object", + "additionalProperties": false, + "properties": { + "loopbackVideo": { + "title": "Loopback Video device", + "description": "Video loopback device created using v4l2loopback.", + "type": "boolean" + }, + "loopbackAudio": { + "title": "Loopback Audio device", + "description": "Audio loopback device created using snd-aloop", + "type": "boolean" + } + } + } + } + }, + "command": { + "title": "Docker command to run (see docker api).", + "type": "array", + "items": { + "type": "string" + }, + "default": [], + "description": "Example: `['/bin/bash', '-c', 'ls']`." + }, + "encryptedEnv": { + "title": "List of encrypted environment variable mappings.", + "description": "List of base64 encoded asymmetric encrypted environment variables. See /docs/reference/workers/docker-worker/environment#encrypted-environment-variables", + "type": "array", + "items": { + "title": "Base64 encoded encrypted environment variable object.", + "type": "string" + } + }, + "env": { + "title": "Environment variable mappings.", + "description": "Example: ```\n{\n \"PATH\": '/borked/path' \n \"ENV_NAME\": \"VALUE\" \n}\n```", + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "maxRunTime": { + "type": "number", + "title": "Maximum run time in seconds", + "description": "Maximum time the task container can run in seconds", + "multipleOf": 1, + "minimum": 1, + "maximum": 86400 + }, + "onExitStatus": { + "title": "Exit status handling", + "description": "By default docker-worker will fail a task with a non-zero exit status without retrying. This payload property allows a task owner to define certain exit statuses that will be marked as a retriable exception.", + "type": "object", + "additionalProperties": false, + "properties": { + "retry": { + "title": "Retriable exit statuses", + "description": "If the task exists with a retriable exit status, the task will be marked as an exception and a new run created.", + "type": "array", + "items": { + "title": "Exit statuses", + "type": "number" + } + }, + "purgeCaches": { + "title": "Purge caches exit status", + "description": "If the task exists with a purge caches exit status, all caches associated with the task will be purged.", + "type": "array", + "items": { + "title": "Exit statuses", + "type": "number" + } + } + } + }, + "artifacts": { + "type": "object", + "title": "Artifacts", + "description": "Artifact upload map example: ```{\"public/build.tar.gz\": {\"path\": \"/home/worker/build.tar.gz\", \"expires\": \"2016-05-28T16:12:56.693817Z\", \"type\": \"file\"}}```", + "additionalProperties": { + "$ref": "#/definitions/artifact" + } + }, + "supersederUrl": { + "title": "URL of the a service that can indicate tasks superseding this one; the current taskId will be appended as a query argument `taskId`. The service should return an object with a `supersedes` key containing a list of taskIds, including the supplied taskId. The tasks should be ordered such that each task supersedes all tasks appearing earlier in the list. See [superseding](/docs/reference/platform/taskcluster-queue/docs/superseding) for more detail.", + "type": "string", + "format": "uri", + "pattern": "^https?://[\\x20-\\x7e]*$" + }, + "features": { + "title": "Feature flags", + "description": "Used to enable additional functionality.", + "type": "object", + "additionalProperties": false, + "properties": { + "localLiveLog": { + "type": "boolean", + "title": "Enable live logging (worker local)", + "description": "Logs are stored on the worker during the duration of tasks and available via http chunked streaming then uploaded to s3" + }, + "bulkLog": { + "type": "boolean", + "title": "Bulk upload the task log into a single artifact", + "description": "Useful if live logging is not interesting but the overalllog is later on" + }, + "taskclusterProxy": { + "type": "boolean", + "title": "Task cluster auth proxy service", + "description": "The auth proxy allows making requests to taskcluster/queue and taskcluster/scheduler directly from your task with the same scopes as set in the task. This can be used to make api calls via the [client](https://github.com/taskcluster/taskcluster-client) CURL, etc... Without embedding credentials in the task." + }, + "balrogVPNProxy": { + "type": "boolean", + "title": "Balrog proxy service", + "description": "The Balrog proxy feature allows tasks to make requests to http://balrog which is a proxied connection through a vpn tunnel to production balrog update server." + }, + "balrogStageVPNProxy": { + "type": "boolean", + "title": "Balrog stage proxy service", + "description": "The Balrog stage proxy feature allows tasks to make requests to http://balrog which is a proxied connection through a vpn tunnel to the stage balrog update server." + }, + "artifacts": { + "type": "boolean", + "title": "Artifact uploads", + "description": "" + }, + "dind": { + "type": "boolean", + "title": "Docker in Docker", + "description": "Runs docker-in-docker and binds `/var/run/docker.sock` into the container. Doesn't allow privileged mode, capabilities or host volume mounts." + }, + "relengAPIProxy": { + "type": "boolean", + "title": "Releng API proxy service", + "description": "The Releng API proxy service allows tasks to talk to releng api using an authorization token based on the task's scopes" + }, + "dockerSave": { + "type": "boolean", + "title": "Docker save", + "description": "Uploads docker images as artifacts" + }, + "interactive": { + "type": "boolean", + "title": "Docker Exec Interactive", + "description": "This allows you to interactively run commands inside the container and attaches you to the stdin/stdout/stderr over a websocket. Can be used for SSH-like access to docker containers." + }, + "allowPtrace": { + "type": "boolean", + "title": "Allow ptrace within the container", + "description": "This allows you to use the Linux ptrace functionality inside the container; it is otherwise disallowed by Docker's security policy. " + }, + "chainOfTrust": { + "type": "boolean", + "title": "Enable generation of a openpgp signed Chain of Trust artifact", + "description": "An artifact named chainOfTrust.json.asc should be generated which will include information for downstream tasks to build a level of trust for the artifacts produced by the task and the environment it ran in." + } + } + } + } +} diff --git a/dockerworker/renameimage.go b/dockerworker/renameimage.go new file mode 100644 index 00000000..c4877ebd --- /dev/null +++ b/dockerworker/renameimage.go @@ -0,0 +1,155 @@ +// +build docker + +package dockerworker + +import ( + "archive/tar" + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "regexp" + + "github.com/pkg/errors" +) + +var allowedDockerLayerIDPattern = regexp.MustCompile(`^[a-fA-f0-9]{64}$`) + +func isAllowedDockerLayerFile(name string) bool { + if len(name) < 64 || !allowedDockerLayerIDPattern.MatchString(name[0:64]) { + return false + } + switch name[64:] { + case ".json", "/", "/json", "/layer.tar", "/VERSION": + return true + default: + return false + } +} + +// renameDockerImageTarStream renames a tar-stream containing a docker image +// in the `docker save` format. The tar-stream is renamed on-the-fly without +// any files being written to disk. +func renameDockerImageTarStream(imageName string, r io.Reader, w io.Writer) error { + // We call rewriteTarStream to rename the tar-stream on-the-fly + nameWritten := false // check that we found 'repositories' or 'manifest.json' + rwerr := rewriteTarStream(r, w, func(hdr *tar.Header, r io.Reader) (*tar.Header, io.Reader, error) { + // Whenever an entry is reached we switch on the name + switch hdr.Name { + case "repositories", "manifest.json": + nameWritten = true + data, err := ioutil.ReadAll(r) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to read the metadata file from docker image tar-stream") + } + switch hdr.Name { + case "repositories": + data, err = rewriteRepositories(imageName, data) + case "manifest.json": + data, err = rewriteManifest(imageName, data) + default: + panic(errors.New("unreachable")) + } + if err != nil { + return nil, nil, err + } + hdr.Size = int64(len(data)) + return hdr, bytes.NewReader(data), nil + default: + if !isAllowedDockerLayerFile(hdr.Name) { + return nil, nil, fmt.Errorf( + "docker image tar-ball contains illegal file: '%s', consider using an older version of docker", hdr.Name, + ) + } + return hdr, r, nil + } + }) + // If there was no error and name wasn't written at some point, then we have problem + if rwerr == nil && !nameWritten { + return fmt.Errorf( + "docker image tar-ball did contain 'manifest.json' or 'repositories', docker image is invalid", + ) + } + return rwerr +} + +// rewriteRepositories will rewrite the 'repositories' file from a docker image +// tar-stream such that the image has a single tag named :latest +func rewriteRepositories(imageName string, data []byte) ([]byte, error) { + // Rewrite the 'repositories' by reading it as JSON from the input stream + var repositories map[string]map[string]string + err := json.Unmarshal(data, &repositories) + if err != nil { + return nil, fmt.Errorf( + "the 'repositories' file in the docker image tar-ball is not valid JSON, error: %s", + err.Error(), + ) + } + // Find the layerID + var layerID string + for _, tags := range repositories { + for _, ID := range tags { + // if there are multiple tags pointing to different IDs that's problematic + if layerID == "" { + layerID = ID + } else if layerID != ID { + return nil, fmt.Errorf( + "the 'repositories' file in the docker image tar-ball contains multiple tags with different layer IDs", + ) + } + } + } + // If there is no imageID we have a problem + if layerID == "" { + return nil, fmt.Errorf( + "the 'repositories' file in the docker image tar-ball did not contain any layer identifiers, image is invalid", + ) + } + // Create result of the rewrite and return it + data, err = json.Marshal(map[string]map[string]string{ + imageName: {"latest": layerID}, // map from imageName to layerID + }) + if err != nil { + panic(errors.Wrap(err, "json.Marshal failed on map[string]map[string]string that can't happen")) + } + return data, nil +} + +// Type to read from manifest.json +type manifestEntry struct { + Config string `json:"Config,omitempty"` + RepoTags []string `json:"RepoTags"` + Layers []string `json:"Layers"` +} + +// rewriteManifest will rewrite the 'manifest.json' file from a docker image +// tar-stream such that the image has a single tag named :latest +func rewriteManifest(imageName string, data []byte) ([]byte, error) { + // Read the manifest.json from input + var manifest []manifestEntry + err := json.Unmarshal(data, &manifest) + if err != nil { + return nil, fmt.Errorf( + "the 'manifest.json' file in the docker image tar-ball is not valid JSON, error: %s", + err.Error(), + ) + } + if len(manifest) == 0 { + return nil, fmt.Errorf( + "the 'manifest.json' file in the docker image tar-ball did not contain any entries", + ) + } + if len(manifest) > 1 { + return nil, fmt.Errorf( + "the 'manifest.json' file in the docker image tar-ball contains more than one entry", + ) + } + // Rewrite the RepoTags and only the first entry as JSON for result + manifest[0].RepoTags = []string{fmt.Sprintf("%s:latest", imageName)} + data, err = json.Marshal([]manifestEntry{manifest[0]}) + if err != nil { + panic(errors.Wrap(err, "json.Marshal failed on []manifestEntry that can't happen")) + } + return data, nil +} diff --git a/dockerworker/renameimage_test.go b/dockerworker/renameimage_test.go new file mode 100644 index 00000000..90dec38e --- /dev/null +++ b/dockerworker/renameimage_test.go @@ -0,0 +1,33 @@ +// +build docker + +package dockerworker + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRewriteRepositories(t *testing.T) { + data, err := rewriteRepositories("my-image", []byte( + `{"busybox":{"latest":"374004614a75c2c4afd41a3050b5217e282155eb1eb7b4ce8f22aa9f4b17ee57"}}`, + )) + require.NoError(t, err) + require.Contains(t, string(data), "my-image") + require.NotContains(t, string(data), "busybox") +} + +func TestRewriteManifest(t *testing.T) { + data, err := rewriteManifest("my-image", []byte(`[ + { + "Config": "2b8fd9751c4c0f5dd266fcae00707e67a2545ef34f9a29354585f93dac906749.json", + "RepoTags": ["busybox:latest"], + "Layers": [ + "374004614a75c2c4afd41a3050b5217e282155eb1eb7b4ce8f22aa9f4b17ee57/layer.tar" + ] + } + ]`)) + require.NoError(t, err) + require.Contains(t, string(data), "my-image") + require.NotContains(t, string(data), "busybox") +} diff --git a/dockerworker/tarstream.go b/dockerworker/tarstream.go new file mode 100644 index 00000000..a24f3c72 --- /dev/null +++ b/dockerworker/tarstream.go @@ -0,0 +1,67 @@ +// +build docker + +package dockerworker + +import ( + "archive/tar" + "io" + + "github.com/pkg/errors" +) + +// a tarRewriter is a function that given a tar.Header and io.Reader returns a +// tar.Header and io.Reader to be written in place of the entry given. +// +// If a nil header is returned the entry is skipped, if no transformation is +// desired the header and reader given can be returned with any changes. +type tarRewriter func(header *tar.Header, r io.Reader) (*tar.Header, io.Reader, error) + +// rewriteTarStream rewrites a tar-stream read from r and written to w. +// +// For each entry in the tar-stream the rewriter function is called, if it +// returns an error then the process is aborted, if it returns a nil header the +// entry is skipped, otherwise the returned header and body is rewritten to the +// output tar-stream. Notice that rewriter can return the arguments given +// in-order to let entries pass-through. +func rewriteTarStream(r io.Reader, w io.Writer, rewriter tarRewriter) error { + // Create a tar.Reader and tar.Writer, so we can move files between the two + tr := tar.NewReader(r) + tw := tar.NewWriter(w) + for { + // Read a tar.Header + hdr, err := tr.Next() + if err == io.EOF { + break // we're done reading + } + if err != nil { + return errors.Wrap(err, "failed to read tar.Reader while rewriting tar-stream") + } + + // Allow the rewriter function to rewrite the entry + var hdr2 *tar.Header + var body io.Reader + hdr2, body, err = rewriter(hdr, tr) + if err != nil { + return err + } + if hdr2 == nil { + continue // skip this entry + } + + // Write tar.Header and copy the body without making any changes + err = tw.WriteHeader(hdr2) + if err != nil { + return errors.Wrap(err, "failed to write a tar.Header, while rewriting tar-stream") + } + // Copy file body to target as well + _, err = io.Copy(tw, body) + if err != nil { + return errors.Wrap(err, "failed to write to tar.Writer, while rewriting tar-stream") + } + } + // Close the tar.Writer, this ensure any cached bytes or outstanding errors are raised + if err := tw.Close(); err != nil { + return errors.Wrap(err, "failed to close tar.Writer") + } + return nil +} diff --git a/dockerworker/tarstream_test.go b/dockerworker/tarstream_test.go new file mode 100644 index 00000000..3d9a1644 --- /dev/null +++ b/dockerworker/tarstream_test.go @@ -0,0 +1,98 @@ +// +build docker + +package dockerworker + +import ( + "archive/tar" + "bytes" + "crypto/rand" + "errors" + "fmt" + "io" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRewriteTarStream(t *testing.T) { + // Create a big random blob + blob := make([]byte, 6*1024*1024) + _, err := rand.Read(blob) + require.NoError(t, err) + + // Create tar file + var input = []struct { + Name string + Data []byte + }{ + {"hello.txt", []byte("hello-world")}, + {"rewritten.txt", []byte("data-to-be-rewritten")}, + {"filtered.txt", blob}, + {"to-rename.txt", []byte("data-in-renamed-file")}, + {"blob.bin", blob}, + } + b := bytes.NewBuffer(nil) + tw := tar.NewWriter(b) + for _, f := range input { + err = tw.WriteHeader(&tar.Header{ + Name: f.Name, + Mode: 0600, + Size: int64(len(f.Data)), + }) + require.NoError(t, err) + _, err = tw.Write(f.Data) + require.NoError(t, err) + } + + // Rewrite the tar file as stream + out := bytes.NewBuffer(nil) + err = rewriteTarStream(b, out, func(hdr *tar.Header, r io.Reader) (*tar.Header, io.Reader, error) { + switch hdr.Name { + case "hello.txt": + return hdr, r, nil + case "rewritten.txt": + data := []byte("data-that-was-rewritten") + hdr.Size = int64(len(data)) + return hdr, bytes.NewReader(data), nil + case "filtered.txt": + return nil, nil, nil + case "to-rename.txt": + hdr.Name = "was-renamed.txt" + return hdr, r, nil + case "blob.bin": + return hdr, r, nil + default: + return nil, nil, errors.New("unhandled file name in test case") + } + }) + require.NoError(t, err) + + // Read the rewritten tar-stream + tr := tar.NewReader(out) + + // declare expected output + var output = []struct { + Name string + Data string + }{ + {"hello.txt", "hello-world"}, + {"rewritten.txt", "data-that-was-rewritten"}, + {"was-renamed.txt", "data-in-renamed-file"}, + {"blob.bin", string(blob)}, + } + for _, f := range output { + fmt.Printf(" - verify %s\n", f.Name) + var hdr *tar.Header + hdr, err = tr.Next() + require.NoError(t, err) + require.Equal(t, f.Name, hdr.Name) + var data []byte + data, err = ioutil.ReadAll(tr) + require.NoError(t, err) + require.Equal(t, f.Data, string(data)) + } + fmt.Println(" - verify that we have EOF") + _, err = tr.Next() + require.Equal(t, io.EOF, err) +} diff --git a/dockerworker/util.go b/dockerworker/util.go new file mode 100644 index 00000000..8f1b8674 --- /dev/null +++ b/dockerworker/util.go @@ -0,0 +1,24 @@ +// +build docker + +package dockerworker + +import ( + "context" + "os" + "testing" + + "github.com/taskcluster/taskcluster-client-go/tcqueue" +) + +const ( + TestProvisionerID = "null-provisioner" + TestWorkerType = "docker-worker" + TestWorkerGroup = "docker-worker" + TestWorkerID = "docker-worker" + TestSchedulerID = "docker-worker-tests" +) + +// NewTestDockerWorker returns a new DockerWorker object for tests +func NewTestDockerWorker(t *testing.T) *DockerWorker { + return New(context.Background(), tcqueue.NewFromEnv(), "testtaskid", os.Stdout) +} diff --git a/dockerworker/util_test.go b/dockerworker/util_test.go new file mode 100644 index 00000000..8d74c030 --- /dev/null +++ b/dockerworker/util_test.go @@ -0,0 +1,126 @@ +// +build docker + +// This module contains utility functions for package tests + +package dockerworker + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "strings" + "time" + + "github.com/cenkalti/backoff" + "github.com/taskcluster/slugid-go/slugid" + tcclient "github.com/taskcluster/taskcluster-client-go" + "github.com/taskcluster/taskcluster-client-go/tcqueue" + artifact "github.com/taskcluster/taskcluster-lib-artifact-go" +) + +// scheduleReclaim run a goroutine that will perform reclaims in the background. +func scheduleReclaim(d *DockerWorker, claim *tcqueue.TaskClaimResponse) { + takenUntil := claim.TakenUntil + + go func() { + for { + select { + case <-time.After(time.Time(takenUntil).Sub(time.Now())): + reclaim, err := d.Queue.ReclaimTask(claim.Status.TaskID, fmt.Sprint(claim.RunID)) + if err != nil { + d.TaskLogger.Printf("Exiting reclaim loop: %v", err) + return + } + takenUntil = reclaim.TakenUntil + case <-d.Context.Done(): + return + } + } + }() +} + +// uploadArtifact uploads a new artifact to the task +func uploadArtifact(d *DockerWorker, taskID, runID, name string, in io.Reader) (ret error) { + client := artifact.New(d.Queue) + + tmp, ret := ioutil.TempFile(os.TempDir(), slugid.Nice()) + if ret != nil { + return + } + defer tmp.Close() + defer os.Remove(tmp.Name()) + + // Use a temporary file because we need a ReadSeeker interface + if _, ret = io.Copy(tmp, in); ret != nil { + return + } + + backoffErr := backoff.Retry(func() error { + if _, err := tmp.Seek(0, io.SeekStart); err != nil { + ret = err + return nil + } + + f, err := ioutil.TempFile(os.TempDir(), "gw") + if err != nil { + return err + } + + defer os.Remove(f.Name()) + defer f.Close() + + return client.Upload(taskID, runID, name, tmp, f, false, false) + }, backoff.WithMaxRetries(backoff.WithContext(backoff.NewExponentialBackOff(), d.Context), 3)) + + if ret == nil { + ret = backoffErr + } + + return +} + +func createDummyTask(d *DockerWorker) (taskID string, err error) { + task := tcqueue.TaskDefinitionRequest{ + Created: tcclient.Time(time.Now()), + Deadline: tcclient.Time(time.Now().Add(10 * time.Minute)), + Expires: tcclient.Time(time.Now().Add(48 * time.Hour)), + Payload: json.RawMessage("{}"), + Metadata: tcqueue.TaskMetadata{ + Description: "generic-worker dummy task", + Name: "generic-worker dummy task", + Owner: "wcosta@mozilla.com", + Source: "https://www.mozilla.org", + }, + Priority: "lowest", + ProvisionerID: TestProvisionerID, + WorkerType: TestWorkerType, + SchedulerID: TestSchedulerID, + } + + taskID = slugid.Nice() + + if _, err = d.Queue.CreateTask(taskID, &task); err != nil { + return + } + + fmt.Printf("Created task %s\n", taskID) + + claimResp, err := d.Queue.ClaimTask(taskID, "0", &tcqueue.TaskClaimRequest{ + WorkerGroup: TestWorkerGroup, + WorkerID: TestWorkerID, + }) + + if err != nil { + return + } + + scheduleReclaim(d, claimResp) + + return +} + +func createDummyArtifact(d *DockerWorker, taskID, name, content string) error { + return uploadArtifact(d, taskID, "0", name, strings.NewReader(content)) +} diff --git a/imageid_test.go b/imageid_test.go new file mode 100644 index 00000000..3aa35731 --- /dev/null +++ b/imageid_test.go @@ -0,0 +1,9 @@ +package main + +import "github.com/taskcluster/slugid-go/slugid" + +var ( + // all tests can share taskGroupId so we can view all test tasks in same + // graph later for troubleshooting + taskGroupID = slugid.Nice() +) diff --git a/intermittent_task_test.go b/intermittent_task_test.go index 4209db9b..d9af9f78 100644 --- a/intermittent_task_test.go +++ b/intermittent_task_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/main.go b/main.go index 0f5fe4fa..5e30f833 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,6 @@ //go:generate gw-codegen file://all-unix-style.yml generated_all-unix-style.go !windows //go:generate gw-codegen file://windows.yml generated_windows.go -//go:generate gw-codegen https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/payload.json dockerworker/payload.go +//go:generate gw-codegen file://dockerworker/payload.json dockerworker/payload.go docker package main @@ -28,7 +28,6 @@ import ( "github.com/taskcluster/taskcluster-client-go/tcawsprovisioner" "github.com/taskcluster/taskcluster-client-go/tcpurgecache" "github.com/taskcluster/taskcluster-client-go/tcqueue" - "github.com/xeipuuv/gojsonschema" ) var ( @@ -809,75 +808,6 @@ func ClaimWork() *TaskRun { } } -func (task *TaskRun) validatePayload() *CommandExecutionError { - jsonPayload := task.Definition.Payload - log.Printf("JSON payload: %s", jsonPayload) - schemaLoader := gojsonschema.NewStringLoader(taskPayloadSchema()) - docLoader := gojsonschema.NewStringLoader(string(jsonPayload)) - result, err := gojsonschema.Validate(schemaLoader, docLoader) - if err != nil { - return MalformedPayloadError(err) - } - if !result.Valid() { - task.Error("TASK FAIL since the task payload is invalid. See errors:") - for _, desc := range result.Errors() { - task.Errorf("- %s", desc) - } - // Dealing with Invalid Task Payloads - // ---------------------------------- - // If the task payload is malformed or invalid, keep in mind that the - // queue doesn't validate the contents of the `task.payload` property, - // the worker may resolve the current run by reporting an exception. - // When reporting an exception, using `tcqueue.ReportException` the - // worker should give a `reason`. If the worker is unable execute the - // task specific payload/code/logic, it should report exception with - // the reason `malformed-payload`. - // - // This can also be used if an external resource that is referenced in - // a declarative nature doesn't exist. Generally, it should be used if - // we can be certain that another run of the task will have the same - // result. This differs from `tcqueue.ReportFailed` in the sense that we - // report a failure if the task specific code failed. - // - // Most tasks includes a lot of declarative steps, such as poll a - // docker image, create cache folder, decrypt encrypted environment - // variables, set environment variables and etc. Clearly, if decryption - // of environment variables fail, there is no reason to retry the task. - // Nor can it be said that the task failed, because the error wasn't - // cause by execution of Turing complete code. - // - // If however, we run some executable code referenced in `task.payload` - // and the code crashes or exists non-zero, then the task is said to be - // failed. The difference is whether or not the unexpected behavior - // happened before or after the execution of task specific Turing - // complete code. - return MalformedPayloadError(fmt.Errorf("Validation of payload failed for task %v", task.TaskID)) - } - err = json.Unmarshal(jsonPayload, &task.Payload) - if err != nil { - return MalformedPayloadError(err) - } - for _, artifact := range task.Payload.Artifacts { - // The default artifact expiry is task expiry, but is only applied when - // the task artifacts are resolved. We intentionally don't modify - // task.Payload otherwise it no longer reflects the real data defined - // in the task. - if !time.Time(artifact.Expires).IsZero() { - // Don't be too strict: allow 1s discrepancy to account for - // possible timestamp rounding on upstream systems - if time.Time(artifact.Expires).Add(time.Second).Before(time.Time(task.Definition.Deadline)) { - return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires before task deadline (%v is before %v)", artifact.Path, artifact.Expires, task.Definition.Deadline)) - } - // Don't be too strict: allow 1s discrepancy to account for - // possible timestamp rounding on upstream systems - if time.Time(artifact.Expires).After(time.Time(task.Definition.Expires).Add(time.Second)) { - return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires after task expiry (%v is after %v)", artifact.Path, artifact.Expires, task.Definition.Expires)) - } - } - } - return nil -} - type CommandExecutionError struct { TaskStatus TaskStatus Cause error diff --git a/main_test.go b/main_test.go index b512c001..6b6fa421 100644 --- a/main_test.go +++ b/main_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/mounts_test.go b/mounts_test.go index d3ac4b3a..db2fc2c4 100644 --- a/mounts_test.go +++ b/mounts_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/os_groups_all-unix-style_test.go b/os_groups_all-unix-style_test.go index f58db001..5d8f9578 100644 --- a/os_groups_all-unix-style_test.go +++ b/os_groups_all-unix-style_test.go @@ -1,4 +1,4 @@ -// +build !windows +// +build !windows,!docker package main diff --git a/os_groups_windows_test.go b/os_groups_windows_test.go index 99da5b2c..b41da91d 100644 --- a/os_groups_windows_test.go +++ b/os_groups_windows_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/payload_test.go b/payload_test.go index bf56cea9..bb58e744 100644 --- a/payload_test.go +++ b/payload_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/plat_all-unix-style.go b/plat_all-unix-style.go index be0abc97..7215bf4f 100644 --- a/plat_all-unix-style.go +++ b/plat_all-unix-style.go @@ -1,4 +1,4 @@ -// +build !windows +// +build !windows,!docker package main diff --git a/plat_darwin.go b/plat_darwin.go index c76555e9..0d9db925 100644 --- a/plat_darwin.go +++ b/plat_darwin.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/plat_docker.go b/plat_docker.go new file mode 100644 index 00000000..a49799b8 --- /dev/null +++ b/plat_docker.go @@ -0,0 +1,145 @@ +// +build docker,!windows + +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "runtime" + "strconv" + "time" + + "github.com/taskcluster/generic-worker/dockerworker" + "github.com/taskcluster/generic-worker/process" + "github.com/taskcluster/shell" +) + +func (task *TaskRun) EnvVars() []string { + env := make([]string, 0, len(task.Payload.Env)) + + for key, value := range task.Payload.Env { + env = append(env, fmt.Sprintf("%s=%s", key, value)) + } + + return env +} + +func platformFeatures() []Feature { + return []Feature{} +} + +type PlatformData struct { + Image json.RawMessage +} + +type TaskContext struct { + TaskDir string +} + +func (task *TaskRun) NewPlatformData() (pd *PlatformData, err error) { + return &PlatformData{}, nil +} + +func (pd *PlatformData) ReleaseResources() error { + return nil +} + +func immediateShutdown(cause string) { +} + +func immediateReboot() { +} + +func init() { + pwd, err := os.Getwd() + if err != nil { + panic(err) + } + + taskContext = &TaskContext{ + TaskDir: pwd, + } +} + +func (task *TaskRun) prepareCommand(index int) error { + return nil +} + +func (task *TaskRun) generateCommand(index int) error { + var err error + task.Commands[index], err = process.NewCommand( + dockerworker.New(context.Background(), task.Queue, task.TaskID, task.logWriter), + task.Payload.Command[index], + task.PlatformData.Image, + taskContext.TaskDir, + task.EnvVars(), + ) + return err +} + +func purgeOldTasks() error { + return nil +} + +func install(arguments map[string]interface{}) (err error) { + return nil +} + +func makeDirReadableForTaskUser(task *TaskRun, dir string) error { + // No user separation yet + return nil +} + +func makeDirUnreadableForTaskUser(task *TaskRun, dir string) error { + // No user separation yet + return nil +} + +func RenameCrossDevice(oldpath, newpath string) error { + return nil +} + +func (task *TaskRun) formatCommand(index int) string { + return shell.Escape(task.Payload.Command[index]...) +} + +func prepareTaskUser(username string) bool { + return false +} + +func deleteTaskDir(path string) error { + return nil +} + +func defaultTasksDir() string { + // assume all user home directories are all in same folder, i.e. the parent + // folder of the current user's home folder + return filepath.Dir(os.Getenv("HOME")) +} + +// N/A for unix - just a windows thing +func AutoLogonCredentials() (string, string) { + return "", "" +} + +func chooseTaskDirName() string { + return "task_" + strconv.Itoa(int(time.Now().Unix())) +} + +func unsetAutoLogon() { +} + +func deleteTaskDirs() { + //removeTaskDirs(config.TasksDir) +} + +func GrantSIDFullControlOfInteractiveWindowsStationAndDesktop(sid string) (err error) { + return fmt.Errorf( + "Cannot grant %v full control of interactive windows station and desktop; platform %v does not have such entities", + sid, + runtime.GOOS, + ) +} diff --git a/plat_windows.go b/plat_windows.go index 25839b70..7c4dde05 100644 --- a/plat_windows.go +++ b/plat_windows.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/process/logininfo_windows.go b/process/logininfo_windows.go index f70464b0..0336dd8d 100644 --- a/process/logininfo_windows.go +++ b/process/logininfo_windows.go @@ -1,3 +1,5 @@ +// +build !docker + package process import ( diff --git a/process/process_darwin.go b/process/process_all-unix-style.go similarity index 99% rename from process/process_darwin.go rename to process/process_all-unix-style.go index 3e55e90c..d4ab882a 100644 --- a/process/process_darwin.go +++ b/process/process_all-unix-style.go @@ -1,3 +1,5 @@ +// +build !windows,!docker + package process import ( diff --git a/process/process_docker.go b/process/process_docker.go new file mode 100644 index 00000000..4ac4b3f0 --- /dev/null +++ b/process/process_docker.go @@ -0,0 +1,152 @@ +// +build docker + +package process + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "os" + "sync" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/taskcluster/generic-worker/dockerworker" +) + +type Result struct { + SystemError error + ExitError error + exitCode int + Duration time.Duration +} + +type Command struct { + mutex sync.RWMutex + worker *dockerworker.DockerWorker + writer io.Writer + cmd []string + workingDirectory string + image json.RawMessage + env []string +} + +func (c *Command) ensureImage() (img *docker.Image, err error) { + var imageName string + var imageArtifact dockerworker.DockerImageArtifact + var indexedImage dockerworker.IndexedDockerImage + + if err = json.Unmarshal(c.image, &imageName); err == nil { + img, err = c.worker.LoadImage(imageName) + } else if err = json.Unmarshal(c.image, &imageArtifact); err == nil { + img, err = c.worker.LoadArtifactImage(imageArtifact.TaskID, "", imageArtifact.Path) + } else if err = json.Unmarshal(c.image, &indexedImage); err == nil { + img, err = c.worker.LoadIndexedImage(indexedImage.Namespace, indexedImage.Path) + } else { + err = errors.New("Invalid image specification") + } + + return +} + +func (c *Command) DirectOutput(writer io.Writer) { + c.writer = writer +} + +func (c *Command) String() string { + return fmt.Sprintf("%q", c.cmd) +} + +func (c *Command) Execute() (r *Result) { + r = &Result{} + + c.mutex.Lock() + defer c.mutex.Unlock() + + image, err := c.ensureImage() + if err != nil { + r.SystemError = fmt.Errorf("Error downloading image: %v", err) + return + } + + container, err := c.worker.CreateContainer(c.env, image, c.cmd, false) + if err != nil { + r.SystemError = fmt.Errorf("Error creating a new container: %v", err) + return + } + + if err = c.worker.Client.StartContainerWithContext(container.ID, nil, c.worker.Context); err != nil { + r.ExitError = fmt.Errorf("Error starting container: %v", err) + return + } + + started := time.Now() + if r.exitCode, err = c.worker.Client.WaitContainer(container.ID); err != nil { + r.ExitError = fmt.Errorf("Error wating for container to finish: %v", err) + return + } + + err = c.worker.Client.Logs(docker.LogsOptions{ + Context: c.worker.Context, + Container: container.ID, + OutputStream: c.worker.LivelogWriter, + ErrorStream: c.worker.LivelogWriter, + Stdout: true, + Stderr: true, + RawTerminal: true, + Timestamps: true, + }) + if err != nil { + r.SystemError = fmt.Errorf("Error pulling container logs: %v", err) + } + + finished := time.Now() + r.Duration = finished.Sub(started) + + return +} + +func (r *Result) CrashCause() error { + return r.SystemError +} + +func (r *Result) Crashed() bool { + return r.SystemError != nil +} + +func (r *Result) FailureCause() error { + if r.ExitError == nil { + return fmt.Errorf("Exit code %v", r.exitCode) + } + + return r.ExitError +} + +func (r *Result) Failed() bool { + return r.SystemError == nil && (r.exitCode != 0 || r.ExitError != nil) +} + +func (r *Result) ExitCode() int { + if r.SystemError != nil || r.ExitError != nil { + return -2 + } + + return r.exitCode +} + +func NewCommand(worker *dockerworker.DockerWorker, commandLine []string, image json.RawMessage, workingDirectory string, env []string) (*Command, error) { + c := &Command{ + worker: worker, + writer: os.Stdout, + cmd: commandLine, + workingDirectory: workingDirectory, + image: image, + env: env, + } + return c, nil +} + +func (c *Command) Kill() ([]byte, error) { + return []byte{}, nil +} diff --git a/process/process_docker_test.go b/process/process_docker_test.go new file mode 100644 index 00000000..191510fb --- /dev/null +++ b/process/process_docker_test.go @@ -0,0 +1,52 @@ +// +build docker + +package process + +import ( + "encoding/json" + "fmt" + "io" + "strings" + "testing" + + docker "github.com/fsouza/go-dockerclient" + "github.com/mattetti/filebuffer" + "github.com/stretchr/testify/require" + "github.com/taskcluster/generic-worker/dockerworker" +) + +func TestProcess(t *testing.T) { + const message = "Would you kindly?" + const imageName = "ubuntu:14.04" + + image, err := json.Marshal(imageName) + require.NoError(t, err) + + d := dockerworker.NewTestDockerWorker(t) + if _, err = d.Client.InspectImage(imageName); err == nil { + require.NoError(t, d.Client.RemoveImageExtended(imageName, docker.RemoveImageOptions{ + Force: true, + Context: d.Context, + })) + } + buff := filebuffer.New([]byte{}) + d.LivelogWriter = buff + + c, err := NewCommand( + d, + []string{"/bin/bash", "-c", "echo $MESSAGE"}, + image, + "/", + []string{fmt.Sprintf("MESSAGE='%s'", message)}, + ) + require.NoError(t, err) + + r := c.Execute() + require.NoError(t, r.SystemError) + require.Equal(t, r.ExitCode(), 0) + + _, err = buff.Seek(0, io.SeekStart) + require.NoError(t, err) + + require.True(t, strings.Contains(buff.String(), message)) +} diff --git a/process/process_linux.go b/process/process_linux.go deleted file mode 100644 index abb7aa3f..00000000 --- a/process/process_linux.go +++ /dev/null @@ -1,139 +0,0 @@ -package process - -import ( - "fmt" - "io" - "os" - "sync" - "time" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/client" - "golang.org/x/net/context" -) - -type Result struct { - SystemError error - ExitCode int64 - Duration time.Duration -} - -type Command struct { - mutex sync.RWMutex - ctx context.Context - cli *client.Client - resp container.ContainerCreateCreatedBody - writer io.Writer - cmd []string - workingDirectory string - env []string -} - -var cli *client.Client - -func init() { - var err error - cli, err = client.NewClientWithOpts(client.WithVersion("1.24")) - if err != nil { - panic(err) - } -} - -func (c *Command) DirectOutput(writer io.Writer) { - c.writer = writer -} - -func (c *Command) String() string { - return fmt.Sprintf("%q", c.cmd) -} - -func (c *Command) Execute() (r *Result) { - - var outPull io.ReadCloser - r = &Result{} - c.mutex.Lock() - defer c.mutex.Unlock() - outPull, r.SystemError = cli.ImagePull(c.ctx, "ubuntu", types.ImagePullOptions{}) - if r.SystemError != nil { - return - } - defer outPull.Close() - io.Copy(c.writer, outPull) - - c.resp, r.SystemError = c.cli.ContainerCreate( - c.ctx, - &container.Config{ - Image: "ubuntu", - Cmd: c.cmd, - WorkingDir: c.workingDirectory, - Env: c.env, - }, - nil, - nil, - "", - ) - if r.SystemError != nil { - return - } - - r.SystemError = c.cli.ContainerStart(c.ctx, c.resp.ID, types.ContainerStartOptions{}) - if r.SystemError != nil { - return - } - - started := time.Now() - res, errch := c.cli.ContainerWait(c.ctx, c.resp.ID, container.WaitConditionNotRunning) - select { - case r.SystemError = <-errch: - if r.SystemError != nil { - return - } - case exitCode := <-res: - r.ExitCode = exitCode.StatusCode - } - - var outLogs io.ReadCloser - outLogs, r.SystemError = c.cli.ContainerLogs(c.ctx, c.resp.ID, types.ContainerLogsOptions{ShowStdout: true}) - if r.SystemError != nil { - return - } - defer outLogs.Close() - io.Copy(c.writer, outLogs) - - finished := time.Now() - r.Duration = finished.Sub(started) - return -} - -func (r *Result) CrashCause() error { - return r.SystemError -} - -func (r *Result) Crashed() bool { - return r.SystemError != nil -} - -func (r *Result) FailureCause() error { - return fmt.Errorf("Exit code %v", r.ExitCode) -} - -func (r *Result) Failed() bool { - return r.ExitCode != 0 -} - -func NewCommand(commandLine []string, workingDirectory string, env []string) (*Command, error) { - c := &Command{ - ctx: context.Background(), - writer: os.Stdout, - cmd: commandLine, - workingDirectory: workingDirectory, - cli: cli, - env: env, - } - return c, nil -} - -func (c *Command) Kill() error { - return nil -} diff --git a/process/process_windows.go b/process/process_windows.go index ea9a52dc..b184969e 100644 --- a/process/process_windows.go +++ b/process/process_windows.go @@ -1,3 +1,5 @@ +// +build !docker + package process import ( diff --git a/supersede_test.go b/supersede_test.go index 4f6b9eba..e6ce2765 100644 --- a/supersede_test.go +++ b/supersede_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/taskcluster_proxy_test.go b/taskcluster_proxy_test.go index 71b533ab..4b749abd 100644 --- a/taskcluster_proxy_test.go +++ b/taskcluster_proxy_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/taskstatus_test.go b/taskstatus_test.go index 5b3225df..4553e021 100644 --- a/taskstatus_test.go +++ b/taskstatus_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/validate.go b/validate.go new file mode 100644 index 00000000..5e325f55 --- /dev/null +++ b/validate.go @@ -0,0 +1,81 @@ +// +build !docker + +package main + +import ( + "encoding/json" + "fmt" + "log" + "time" + + "github.com/xeipuuv/gojsonschema" +) + +func (task *TaskRun) validatePayload() *CommandExecutionError { + jsonPayload := task.Definition.Payload + log.Printf("JSON payload: %s", jsonPayload) + schemaLoader := gojsonschema.NewStringLoader(taskPayloadSchema()) + docLoader := gojsonschema.NewStringLoader(string(jsonPayload)) + result, err := gojsonschema.Validate(schemaLoader, docLoader) + if err != nil { + return MalformedPayloadError(err) + } + if !result.Valid() { + task.Error("TASK FAIL since the task payload is invalid. See errors:") + for _, desc := range result.Errors() { + task.Errorf("- %s", desc) + } + // Dealing with Invalid Task Payloads + // ---------------------------------- + // If the task payload is malformed or invalid, keep in mind that the + // queue doesn't validate the contents of the `task.payload` property, + // the worker may resolve the current run by reporting an exception. + // When reporting an exception, using `tcqueue.ReportException` the + // worker should give a `reason`. If the worker is unable execute the + // task specific payload/code/logic, it should report exception with + // the reason `malformed-payload`. + // + // This can also be used if an external resource that is referenced in + // a declarative nature doesn't exist. Generally, it should be used if + // we can be certain that another run of the task will have the same + // result. This differs from `tcqueue.ReportFailed` in the sense that we + // report a failure if the task specific code failed. + // + // Most tasks includes a lot of declarative steps, such as poll a + // docker image, create cache folder, decrypt encrypted environment + // variables, set environment variables and etc. Clearly, if decryption + // of environment variables fail, there is no reason to retry the task. + // Nor can it be said that the task failed, because the error wasn't + // cause by execution of Turing complete code. + // + // If however, we run some executable code referenced in `task.payload` + // and the code crashes or exists non-zero, then the task is said to be + // failed. The difference is whether or not the unexpected behavior + // happened before or after the execution of task specific Turing + // complete code. + return MalformedPayloadError(fmt.Errorf("Validation of payload failed for task %v", task.TaskID)) + } + err = json.Unmarshal(jsonPayload, &task.Payload) + if err != nil { + return MalformedPayloadError(err) + } + for _, artifact := range task.Payload.Artifacts { + // The default artifact expiry is task expiry, but is only applied when + // the task artifacts are resolved. We intentionally don't modify + // task.Payload otherwise it no longer reflects the real data defined + // in the task. + if !time.Time(artifact.Expires).IsZero() { + // Don't be too strict: allow 1s discrepancy to account for + // possible timestamp rounding on upstream systems + if time.Time(artifact.Expires).Add(time.Second).Before(time.Time(task.Definition.Deadline)) { + return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires before task deadline (%v is before %v)", artifact.Path, artifact.Expires, task.Definition.Deadline)) + } + // Don't be too strict: allow 1s discrepancy to account for + // possible timestamp rounding on upstream systems + if time.Time(artifact.Expires).After(time.Time(task.Definition.Expires).Add(time.Second)) { + return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires after task expiry (%v is after %v)", artifact.Path, artifact.Expires, task.Definition.Expires)) + } + } + } + return nil +} diff --git a/validate_docker.go b/validate_docker.go new file mode 100644 index 00000000..e1d282bc --- /dev/null +++ b/validate_docker.go @@ -0,0 +1,59 @@ +// +build docker + +package main + +import ( + "encoding/json" + "fmt" + + "github.com/taskcluster/generic-worker/dockerworker" +) + +func dockerWorkerToGenericWorkerPayload(dw *dockerworker.DockerWorkerPayload) *GenericWorkerPayload { + artifacts := make([]Artifact, 0, len(dw.Artifacts)) + for name, artifact := range dw.Artifacts { + artifacts = append(artifacts, Artifact{ + Expires: artifact.Expires, + Name: name, + Path: artifact.Path, + Type: artifact.Type, + }) + } + + return &GenericWorkerPayload{ + Artifacts: artifacts, + Command: [][]string{dw.Command}, + Env: dw.Env, + MaxRunTime: int64(dw.MaxRunTime), + SupersederURL: dw.SupersederURL, + Features: FeatureFlags{}, + Mounts: []json.RawMessage{}, + OnExitStatus: ExitCodeHandling{}, + OSGroups: []string{}, + } +} + +func (task *TaskRun) validatePayload() *CommandExecutionError { + result, err := dockerworker.ValidatePayload(task.Definition.Payload) + if err != nil { + return MalformedPayloadError(err) + } + + if !result.Valid() { + task.Error("TASK FAIL since the task payload is invalid. See errors:") + for _, desc := range result.Errors() { + task.Errorf("- %s", desc) + } + return MalformedPayloadError(fmt.Errorf("Validation of payload failed for task %v", task.TaskID)) + } + + var payload dockerworker.DockerWorkerPayload + if err = json.Unmarshal(task.Definition.Payload, &payload); err != nil { + return MalformedPayloadError(err) + } + + task.PlatformData.Image = payload.Image + task.Payload = *dockerWorkerToGenericWorkerPayload(&payload) + + return nil +}