From dae6d63755fdd4c1d4f60039d2f1f6fbe7b5bb80 Mon Sep 17 00:00:00 2001 From: Wander Lairson Costa Date: Tue, 16 Oct 2018 18:04:32 -0300 Subject: [PATCH] Bug 1502337: Initial support for docker-worker payloads This commit implements the initial support for docker-worker payloads. So far, only the Image, Command, Env and MaxRunTime fields are supported. It hasn't been extensive tested so features like task cancellation may not work. Also, it doesn't implement custom artifacts and more extensive tests. The support for docker-worker payloads is achieved through build flags. To build with docker-worker paylod, build it with the "docker" flag, like so: $ go build -tags=docker Notice that generic-worker will not be supported in this build. We also needed to upgrade golang due to a bug in the go compiler [1]. [1] https://github.com/golang/go/issues/25908 --- .gitignore | 3 + .taskcluster.yml | 76 ++++----- .travis.yml | 2 +- artifacts_test.go | 9 +- aws_test.go | 2 + chain_of_trust_all-unix-style.go | 2 +- chain_of_trust_docker.go | 11 ++ docker_linux.go | 46 ------ dockerworker/artifact.go | 151 +++++++++++++++++ dockerworker/artifact_test.go | 36 ++++ dockerworker/dockerimage.go | 145 ++++++++++++++++ dockerworker/dockerimage_test.go | 62 +++++++ dockerworker/dockerworker.go | 70 ++++++++ dockerworker/log.go | 20 +++ dockerworker/renameimage.go | 155 ++++++++++++++++++ dockerworker/renameimage_test.go | 33 ++++ dockerworker/tarstream.go | 67 ++++++++ dockerworker/tarstream_test.go | 98 +++++++++++ dockerworker/util.go | 20 +++ dockerworker/util_test.go | 58 +++++++ imageid_test.go | 9 + intermittent_task_test.go | 2 + main.go | 72 +------- main_test.go | 2 + mounts_test.go | 2 + os_groups_all-unix-style_test.go | 2 +- os_groups_windows_test.go | 2 + payload_test.go | 2 + plat_all-unix-style.go | 2 +- plat_darwin.go | 2 + plat_docker.go | 153 +++++++++++++++++ plat_windows.go | 2 + process/logininfo_windows.go | 2 + ...ss_darwin.go => process_all-unix-style.go} | 2 + process/process_docker.go | 141 ++++++++++++++++ process/process_docker_test.go | 52 ++++++ process/process_linux.go | 139 ---------------- process/process_windows.go | 2 + supersede_test.go | 2 + taskcluster_proxy_test.go | 2 + taskstatus_test.go | 2 + validate.go | 81 +++++++++ validate_docker.go | 58 +++++++ 43 files changed, 1496 insertions(+), 305 deletions(-) create mode 100644 chain_of_trust_docker.go delete mode 100644 docker_linux.go create mode 100644 dockerworker/artifact.go create mode 100644 dockerworker/artifact_test.go create mode 100644 dockerworker/dockerimage.go create mode 100644 dockerworker/dockerimage_test.go create mode 100644 dockerworker/dockerworker.go create mode 100644 dockerworker/log.go create mode 100644 dockerworker/renameimage.go create mode 100644 dockerworker/renameimage_test.go create mode 100644 dockerworker/tarstream.go create mode 100644 dockerworker/tarstream_test.go create mode 100644 dockerworker/util.go create mode 100644 dockerworker/util_test.go create mode 100644 imageid_test.go create mode 100644 plat_docker.go rename process/{process_darwin.go => process_all-unix-style.go} (99%) create mode 100644 process/process_docker.go create mode 100644 process/process_docker_test.go delete mode 100644 process/process_linux.go create mode 100644 validate.go create mode 100644 validate_docker.go diff --git a/.gitignore b/.gitignore index 6ce29864..8f466748 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,6 @@ directory-caches.json file-caches.json tasks-resolved-count.txt /revision.txt + +# binary file +generic-worker diff --git a/.taskcluster.yml b/.taskcluster.yml index 522e04d9..155d9f68 100644 --- a/.taskcluster.yml +++ b/.taskcluster.yml @@ -70,8 +70,8 @@ tasks: maxRunTime: 3600 command: - set CGO_ENABLED=0 - - set GOPATH=%CD%\gopath1.10.3 - - set GOROOT=%CD%\go1.10.3\go + - set GOPATH=%CD%\gopath1.10.4 + - set GOROOT=%CD%\go1.10.4\go - set PATH=%CD%\git\cmd;%GOPATH%\bin;%GOROOT%\bin;%PATH% - git config --global core.autocrlf false - go version @@ -111,16 +111,16 @@ tasks: - ineffassign . artifacts: - name: public/build/generic-worker-windows-amd64.exe - path: gopath1.10.3\bin\generic-worker.exe + path: gopath1.10.4\bin\generic-worker.exe expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3\src + directory: gopath1.10.4\src - content: - url: https://storage.googleapis.com/golang/go1.10.3.windows-amd64.zip - sha256: a3f19d4fc0f4b45836b349503e347e64e31ab830dedac2fc9c390836d4418edb - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.windows-amd64.zip + sha256: 5499aa98399664df8dc1da5c3aaaed14b3130b79c713b5677a0ee9e93854476c + directory: go1.10.4 format: zip - content: url: https://github.com/git-for-windows/git/releases/download/v2.14.1.windows.1/MinGit-2.14.1-64-bit.zip @@ -152,8 +152,8 @@ tasks: maxRunTime: 3600 command: - set CGO_ENABLED=0 - - set GOPATH=%CD%\gopath1.10.3 - - set GOROOT=%CD%\go1.10.3\go + - set GOPATH=%CD%\gopath1.10.4 + - set GOROOT=%CD%\go1.10.4\go - set PATH=%CD%\git\bin;%GOPATH%\bin;%GOROOT%\bin;%PATH% - git config --global core.autocrlf false - go version @@ -182,16 +182,16 @@ tasks: - ineffassign . artifacts: - name: public/build/generic-worker-windows-386.exe - path: gopath1.10.3\bin\generic-worker.exe + path: gopath1.10.4\bin\generic-worker.exe expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3\src + directory: gopath1.10.4\src - content: - url: https://storage.googleapis.com/golang/go1.10.3.windows-386.zip - sha256: 89696a29bdf808fa9861216a21824ae8eb2e750a54b1424ce7f2a177e5cd1466 - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.windows-386.zip + sha256: 407e5619048c427de4a65b26edb17d54c220f8c30ebd358961b1785a38394ec9 + directory: go1.10.4 format: zip - content: url: https://github.com/git-for-windows/git/releases/download/v2.11.0.windows.3/Git-2.11.0.3-32-bit.tar.bz2 @@ -223,8 +223,8 @@ tasks: maxRunTime: 3600 command: - set CGO_ENABLED=0 - - set GOPATH=%CD%\gopath1.10.3 - - set GOROOT=%CD%\go1.10.3\go + - set GOPATH=%CD%\gopath1.10.4 + - set GOROOT=%CD%\go1.10.4\go - set PATH=%CD%\git\cmd;%GOPATH%\bin;%GOROOT%\bin;%PATH% - git config --global core.autocrlf false - go version @@ -264,16 +264,16 @@ tasks: - ineffassign . artifacts: - name: public/build/generic-worker-windows-amd64.exe - path: gopath1.10.3\bin\generic-worker.exe + path: gopath1.10.4\bin\generic-worker.exe expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3\src + directory: gopath1.10.4\src - content: - url: https://storage.googleapis.com/golang/go1.10.3.windows-amd64.zip - sha256: a3f19d4fc0f4b45836b349503e347e64e31ab830dedac2fc9c390836d4418edb - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.windows-amd64.zip + sha256: 5499aa98399664df8dc1da5c3aaaed14b3130b79c713b5677a0ee9e93854476c + directory: go1.10.4 format: zip - content: url: https://github.com/git-for-windows/git/releases/download/v2.14.1.windows.1/MinGit-2.14.1-64-bit.zip @@ -308,8 +308,8 @@ tasks: - -vxec - | export CGO_ENABLED=0 - export GOROOT="$(pwd)/go1.10.3/go" - export GOPATH="$(pwd)/gopath1.10.3" + export GOROOT="$(pwd)/go1.10.4/go" + export GOPATH="$(pwd)/gopath1.10.4" export PATH="${GOPATH}/bin:${GOROOT}/bin:${PATH}" go version go env @@ -335,16 +335,16 @@ tasks: ineffassign . artifacts: - name: public/build/generic-worker-darwin-amd64 - path: gopath1.10.3/bin/generic-worker + path: gopath1.10.4/bin/generic-worker expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3/src + directory: gopath1.10.4/src - content: - url: https://storage.googleapis.com/golang/go1.10.3.darwin-amd64.tar.gz - sha256: 131fd430350a3134d352ee75c5ca456cdf4443e492d0527a9651c7c04e2b458d - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.darwin-amd64.tar.gz + sha256: 2ba324f01de2b2ece0376f6d696570a4c5c13db67d00aadfd612adc56feff587 + directory: go1.10.4 format: tar.gz @@ -380,8 +380,8 @@ tasks: # - -vxec # - | # export CGO_ENABLED=0 - # export GOROOT="$(pwd)/go1.10.3/go" - # export GOPATH="$(pwd)/gopath1.10.3" + # export GOROOT="$(pwd)/go1.10.4/go" + # export GOPATH="$(pwd)/gopath1.10.4" # export PATH="${GOPATH}/bin:${GOROOT}/bin:${PATH}" # export CGO_ENABLED=0 # go version @@ -407,16 +407,16 @@ tasks: # ineffassign . # artifacts: # - name: public/build/generic-worker-linux-armv6l - # path: gopath1.10.3/bin/generic-worker + # path: gopath1.10.4/bin/generic-worker # expires: "{{ '2 weeks' | $fromNow }}" # type: file # mounts: # - cacheName: generic-worker-checkout - # directory: gopath1.10.3/src + # directory: gopath1.10.4/src # - content: - # url: https://storage.googleapis.com/golang/go1.10.3.linux-armv6l.tar.gz + # url: https://storage.googleapis.com/golang/go1.10.4.linux-armv6l.tar.gz # sha256: d3df3fa3d153e81041af24f31a82f86a21cb7b92c1b5552fb621bad0320f06b6 - # directory: go1.10.3 + # directory: go1.10.4 # format: tar.gz @@ -472,13 +472,13 @@ tasks: "${GOPATH}/bin/ineffassign" . artifacts: - name: public/build/generic-worker-linux-amd64 - path: gopath1.10.1/bin/generic-worker + path: gopath1.10.4/bin/generic-worker expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.1/src + directory: gopath1.10.4/src - content: - url: https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz - directory: go1.10.1 + url: https://storage.googleapis.com/golang/go1.10.4.linux-amd64.tar.gz + directory: go1.10.4 format: tar.gz diff --git a/.travis.yml b/.travis.yml index 656cca7c..b659a8b9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go go: - - "1.10.3" + - "1.10.4" env: - "CGO_ENABLED=0 GIMME_OS=linux GIMME_ARCH=386" diff --git a/artifacts_test.go b/artifacts_test.go index 0dd27ee5..7331d4b1 100644 --- a/artifacts_test.go +++ b/artifacts_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( @@ -13,17 +15,10 @@ import ( "golang.org/x/crypto/openpgp" "golang.org/x/crypto/openpgp/clearsign" - "github.com/taskcluster/slugid-go/slugid" tcclient "github.com/taskcluster/taskcluster-client-go" "github.com/taskcluster/taskcluster-client-go/tcqueue" ) -var ( - // all tests can share taskGroupId so we can view all test tasks in same - // graph later for troubleshooting - taskGroupID = slugid.Nice() -) - func validateArtifacts( t *testing.T, payloadArtifacts []Artifact, diff --git a/aws_test.go b/aws_test.go index 4fb21e8a..828892ee 100644 --- a/aws_test.go +++ b/aws_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/chain_of_trust_all-unix-style.go b/chain_of_trust_all-unix-style.go index ee371da3..8a5fa376 100644 --- a/chain_of_trust_all-unix-style.go +++ b/chain_of_trust_all-unix-style.go @@ -1,4 +1,4 @@ -// +build !windows +// +build !windows,!docker package main diff --git a/chain_of_trust_docker.go b/chain_of_trust_docker.go new file mode 100644 index 00000000..c0c20da7 --- /dev/null +++ b/chain_of_trust_docker.go @@ -0,0 +1,11 @@ +// +build docker + +package main + +func (cot *ChainOfTrustTaskFeature) ensureTaskUserCantReadPrivateCotKey() error { + return nil +} + +func secureSigningKey() error { + return nil +} diff --git a/docker_linux.go b/docker_linux.go deleted file mode 100644 index b206cc2c..00000000 --- a/docker_linux.go +++ /dev/null @@ -1,46 +0,0 @@ -package main - -import ( - "github.com/taskcluster/taskcluster-base-go/scopes" -) - -type DockerFeature struct { -} - -func (feature *DockerFeature) Name() string { - return "Docker" -} - -func (feature *DockerFeature) Initialise() error { - return nil -} - -func (feature *DockerFeature) PersistState() error { - return nil -} - -func (feature *DockerFeature) IsEnabled(task *TaskRun) bool { - return true -} - -type DockerTask struct { - task *TaskRun -} - -func (feature *DockerFeature) NewTaskFeature(task *TaskRun) TaskFeature { - return &DockerTask{ - task: task, - } -} - -func (l *DockerTask) RequiredScopes() scopes.Required { - return scopes.Required{} -} - -func (l *DockerTask) Start() *CommandExecutionError { - return nil -} - -func (l *DockerTask) Stop() *CommandExecutionError { - return nil -} diff --git a/dockerworker/artifact.go b/dockerworker/artifact.go new file mode 100644 index 00000000..70c6624d --- /dev/null +++ b/dockerworker/artifact.go @@ -0,0 +1,151 @@ +// +build docker + +package dockerworker + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" + "strconv" + "time" + + "github.com/cenkalti/backoff" + "github.com/mattetti/filebuffer" + "github.com/mitchellh/ioprogress" + "github.com/taskcluster/taskcluster-client-go/tcqueue" + artifact "github.com/taskcluster/taskcluster-lib-artifact-go" +) + +// ScheduleReclaim run a goroutine that will perform reclaims in the background. +func (d *DockerWorker) ScheduleReclaim(claim *tcqueue.TaskClaimResponse) { + takenUntil := claim.TakenUntil + + go func() { + for { + select { + case <-time.After(time.Time(takenUntil).Sub(time.Now())): + reclaim, err := d.queue.ReclaimTask(claim.Status.TaskID, fmt.Sprint(claim.RunID)) + if err != nil { + d.TaskLogger.Printf("Exiting reclaim loop: %v", err) + return + } + takenUntil = reclaim.TakenUntil + case <-d.Context.Done(): + return + } + } + }() +} + +// DownloadArtifact downloads an artifact using exponential backoff algorithm +func (d *DockerWorker) DownloadArtifact(taskID, runID, name string, out io.WriteSeeker) (ret error) { + var u *url.URL + + backoffError := backoff.Retry(func() (err error) { + // rewind out stream + _, ret = out.Seek(0, io.SeekStart) + if ret != nil { + return + } + + if runID == "" { + u, ret = d.queue.GetLatestArtifact_SignedURL(taskID, name, 24*time.Hour) + } else { + u, ret = d.queue.GetArtifact_SignedURL(taskID, runID, name, 24*time.Hour) + } + + if ret != nil { + return + } + + d.TaskLogger.Printf("Downloading %s/%s/%s from %s\n", taskID, runID, name, u.String()) + + // Build a custom request object so we can embed a context into it + var req *http.Request + req, ret = http.NewRequest(http.MethodGet, u.String(), nil) + if ret != nil { + return + } + req = req.WithContext(d.Context) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + fmt.Println(err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + buff := filebuffer.New([]byte{}) + + // Build the error message with the status code and the response body + errorMessage := fmt.Sprintf("Error downloading %s/%s, status=%s", taskID, name, resp.Status) + if _, err2 := io.Copy(buff, resp.Body); err2 == nil { + errorMessage += "\n" + buff.String() + } + + // For status codes other than 5XX there is no point in issuing a retry + if resp.StatusCode >= 300 && resp.StatusCode < 500 { + ret = errors.New(errorMessage) + } else { + err = errors.New(errorMessage) + } + + return + } + + size, err2 := strconv.Atoi(resp.Header.Get("Content-Length")) + if err2 != nil { + ret = err2 + return + } + + // Depending on the implementation, the body can also issue network requests, + // that's why we need to retry if io.Copy fails + _, err = io.Copy(out, &ioprogress.Reader{ + Reader: resp.Body, + Size: int64(size), + DrawFunc: ioprogress.DrawTerminal(d.LivelogWriter), + }) + + return + }, backoff.WithMaxRetries(backoff.WithContext(backoff.NewExponentialBackOff(), d.Context), 3)) + + if ret == nil { + ret = backoffError + } + + return +} + +// UploadArtifact uploads a new artifact to the task +func (d *DockerWorker) UploadArtifact(taskID, runID, name string, in io.ReadSeeker) (ret error) { + client := artifact.New(d.queue) + + backoffErr := backoff.Retry(func() error { + if _, err := in.Seek(0, io.SeekStart); err != nil { + ret = err + return nil + } + + f, err := ioutil.TempFile(os.TempDir(), "gw") + if err != nil { + return err + } + + defer os.Remove(f.Name()) + defer f.Close() + + return client.Upload(taskID, runID, name, in, f, false, false) + }, backoff.WithMaxRetries(backoff.WithContext(backoff.NewExponentialBackOff(), d.Context), 3)) + + if ret == nil { + ret = backoffErr + } + + return +} diff --git a/dockerworker/artifact_test.go b/dockerworker/artifact_test.go new file mode 100644 index 00000000..c3b2931e --- /dev/null +++ b/dockerworker/artifact_test.go @@ -0,0 +1,36 @@ +// +build docker + +package dockerworker + +import ( + "io" + "testing" + + "github.com/mattetti/filebuffer" + "github.com/stretchr/testify/require" +) + +func TestArtifact(t *testing.T) { + d := NewTestDockerWorker(t) + + content := "This is a dummy artifact" + + taskID, err := createDummyArtifact(d, "public/test", content) + require.NoError(t, err) + + // Test Download Latest + buff := filebuffer.New([]byte{}) + require.NoError(t, d.DownloadArtifact(taskID, "", "public/test", buff)) + + _, err = buff.Seek(0, io.SeekStart) + require.NoError(t, err) + require.Equal(t, content, buff.String()) + + // Test Download specific runID + buff = filebuffer.New([]byte{}) + require.NoError(t, d.DownloadArtifact(taskID, "0", "public/test", buff)) + + _, err = buff.Seek(0, io.SeekStart) + require.NoError(t, err) + require.Equal(t, content, buff.String()) +} diff --git a/dockerworker/dockerimage.go b/dockerworker/dockerimage.go new file mode 100644 index 00000000..aab81ba0 --- /dev/null +++ b/dockerworker/dockerimage.go @@ -0,0 +1,145 @@ +// +build docker + +package dockerworker + +import ( + "context" + "encoding/base64" + "fmt" + "io" + "io/ioutil" + "os" + "strings" + + "github.com/DataDog/zstd" + "github.com/cenkalti/backoff" + docker "github.com/fsouza/go-dockerclient" + "github.com/pierrec/lz4" + "github.com/taskcluster/taskcluster-client-go/tcindex" +) + +func MakeImageName(taskID, runID, artifactName string) string { + return strings.ToLower(base64.RawStdEncoding.EncodeToString([]byte(taskID + runID + artifactName))) +} + +// Ensure image pulls the given image if it doesn't exist +func EnsureImage(ctx context.Context, cli *docker.Client, imageName string, log io.Writer) (img *docker.Image, err error) { + if img, err = cli.InspectImage(imageName); err == nil { + return + } + + err = backoff.Retry(func() error { + return cli.PullImage(docker.PullImageOptions{ + Repository: imageName, + OutputStream: log, + Context: ctx, + }, docker.AuthConfiguration{}) + }, backoff.WithMaxRetries(backoff.WithContext(backoff.NewExponentialBackOff(), ctx), 3)) + + if err != nil { + return + } + + return cli.InspectImage(imageName) +} + +// LoadArtifactImage loads an image from a task artifact +func (d *DockerWorker) LoadArtifactImage(taskID, runID, name string) (img *docker.Image, err error) { + if runID != "" { + d.TaskLogger.Printf("Loading image %s/%s/%s", taskID, runID, name) + } else { + d.TaskLogger.Printf("Loading image %s/%s", taskID, name) + } + + imageName := MakeImageName(taskID, runID, name) + + // If we already downloaded the image, just return it + img, err = d.Client.InspectImage(imageName) + if err == nil { + return + } + + // The downloaded image is written to a file because it might be + // a huge image + f, err := ioutil.TempFile(os.TempDir(), "image") + if err != nil { + return + } + + defer os.Remove(f.Name()) + defer f.Close() + + if err = d.DownloadArtifact(taskID, runID, name, f); err != nil { + return + } + + // rewind the file pointer to read the downloaded file + if _, err = f.Seek(0, io.SeekStart); err != nil { + return + } + + var r io.Reader + + // We accept the image in three formats: .tar, .tar.lz4 and .tar.zst + switch { + case strings.HasSuffix(name, ".zst"): + rc := zstd.NewReader(f) + defer rc.Close() + r = rc + case strings.HasSuffix(name, ".lz4"): + r = lz4.NewReader(f) + case strings.HasSuffix(name, ".tar"): + r = f + default: + err = fmt.Errorf("Not supported format for image artifact %s", name) + return + } + + // We need to rename the image name to avoid cache poisoning. + // Again we store it in a temporary file to avoid exhausting + // the memory + t, err := ioutil.TempFile(os.TempDir(), "image.tar") + if err != nil { + return + } + + defer os.Remove(t.Name()) + defer t.Close() + + err = renameDockerImageTarStream(imageName, r, t) + if err != nil { + return + } + + if _, err = t.Seek(0, io.SeekStart); err != nil { + return + } + + d.Client.LoadImage(docker.LoadImageOptions{ + Context: d.Context, + InputStream: t, + OutputStream: d.LivelogWriter, + }) + + img, err = d.Client.InspectImage(imageName) + + return +} + +func (d *DockerWorker) LoadIndexedImage(namespace, path string) (*docker.Image, error) { + d.TaskLogger.Printf("Loading image %s:%s", namespace, path) + + index := tcindex.NewFromEnv() + + task, err := index.FindTask(namespace) + if err != nil { + return nil, err + } + + return d.LoadArtifactImage(task.TaskID, "", path) +} + +func (d *DockerWorker) LoadImage(name string) (*docker.Image, error) { + d.TaskLogger.Printf("Loading image %s", name) + return EnsureImage(d.Context, d.Client, name, d.LivelogWriter) +} diff --git a/dockerworker/dockerimage_test.go b/dockerworker/dockerimage_test.go new file mode 100644 index 00000000..1e5ebb9a --- /dev/null +++ b/dockerworker/dockerimage_test.go @@ -0,0 +1,62 @@ +// +build docker + +package dockerworker + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +const ( + imageTaskID = "Xx0aPfyOTU2o_0FZnr_AJg" + testNamespace = "garbage.docker-worker-tests.docker-images" + exampleImage = "coreos/example:1.0.0" +) + +func loadArtifactImageTest(t *testing.T, artifactName string) { + d := NewTestDockerWorker(t) + + imageName := MakeImageName(imageTaskID, "", artifactName) + + img, err := d.LoadArtifactImage(imageTaskID, "", artifactName) + require.NoError(t, err) + defer d.Client.RemoveImage(img.ID) + + loadedImage, err := d.Client.InspectImage(imageName) + require.NoError(t, err) + require.Equal(t, loadedImage.ID, img.ID) +} + +func TestLoadArtifactImageTar(t *testing.T) { + loadArtifactImageTest(t, "public/image.tar") +} + +func TestLoadArtifactImageZst(t *testing.T) { + loadArtifactImageTest(t, "public/image.tar.zst") +} + +func TestLoadArtifactImageLz4(t *testing.T) { + loadArtifactImageTest(t, "public/image.tar.lz4") +} + +func TestLoadIndexedImage(t *testing.T) { + d := NewTestDockerWorker(t) + + img, err := d.LoadIndexedImage(testNamespace, "public/image.tar") + require.NoError(t, err) + + require.NoError(t, d.Client.RemoveImage(img.ID)) +} + +func TestLoadImage(t *testing.T) { + d := NewTestDockerWorker(t) + + img, err := d.LoadImage(exampleImage) + require.NoError(t, err) + defer d.Client.RemoveImage(img.ID) + + img2, err := d.Client.InspectImage(exampleImage) + require.NoError(t, err) + require.Equal(t, img.ID, img2.ID) +} diff --git a/dockerworker/dockerworker.go b/dockerworker/dockerworker.go new file mode 100644 index 00000000..fb2017dd --- /dev/null +++ b/dockerworker/dockerworker.go @@ -0,0 +1,70 @@ +// +build docker + +package dockerworker + +import ( + "context" + "encoding/json" + "io" + "log" + + docker "github.com/fsouza/go-dockerclient" + "github.com/taskcluster/taskcluster-client-go/tcqueue" + "github.com/xeipuuv/gojsonschema" +) + +type DockerWorker struct { + LivelogWriter io.Writer + Logger *log.Logger + TaskLogger *log.Logger + Context context.Context + queue *tcqueue.Queue + Client *docker.Client +} + +func New(ctx context.Context, queue *tcqueue.Queue, cli *docker.Client, taskID string, liveLogWriter io.Writer) *DockerWorker { + return &DockerWorker{ + LivelogWriter: liveLogWriter, + Logger: NewLogger(taskID), + TaskLogger: NewTaskLogger(liveLogWriter), + Context: ctx, + queue: queue, + Client: cli, + } +} + +// ValidatePayload validates the docker worker task payload +func ValidatePayload(payload json.RawMessage) (result *gojsonschema.Result, err error) { + const schema = "https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json" + + schemaLoader := gojsonschema.NewReferenceLoader(schema) + docLoader := gojsonschema.NewStringLoader(string(payload)) + return gojsonschema.Validate(schemaLoader, docLoader) +} + +// CreateContainer creates a new docker container to run a task +func (d *DockerWorker) CreateContainer(env []string, image *docker.Image, command []string, privileged bool) (container *docker.Container, err error) { + return d.Client.CreateContainer(docker.CreateContainerOptions{ + Config: &docker.Config{ + Image: image.ID, + Cmd: command, + Hostname: "", + User: "", + AttachStdin: false, + AttachStdout: true, + AttachStderr: true, + Tty: true, + OpenStdin: false, + StdinOnce: false, + Env: env, + }, + HostConfig: &docker.HostConfig{ + Privileged: privileged, + ShmSize: 1800000000, + ExtraHosts: []string{ + "localhost.localdomain:127.0.0.1", // Bug 1488148 + }, + }, + Context: d.Context, + }) +} diff --git a/dockerworker/log.go b/dockerworker/log.go new file mode 100644 index 00000000..f7a5bd9c --- /dev/null +++ b/dockerworker/log.go @@ -0,0 +1,20 @@ +// +build docker + +package dockerworker + +import ( + "fmt" + "io" + "log" + "os" +) + +// NewLogger returns a new Logger that will write to standard output +func NewLogger(taskID string) *log.Logger { + return log.New(os.Stdout, fmt.Sprintf("[generic-docker-worker taskID=\"%s\"] ", taskID), 0) +} + +// NewTaskLogger returns a logger that will writer to the task log +func NewTaskLogger(writer io.Writer) *log.Logger { + return log.New(writer, "", 0) +} diff --git a/dockerworker/renameimage.go b/dockerworker/renameimage.go new file mode 100644 index 00000000..c4877ebd --- /dev/null +++ b/dockerworker/renameimage.go @@ -0,0 +1,155 @@ +// +build docker + +package dockerworker + +import ( + "archive/tar" + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "regexp" + + "github.com/pkg/errors" +) + +var allowedDockerLayerIDPattern = regexp.MustCompile(`^[a-fA-f0-9]{64}$`) + +func isAllowedDockerLayerFile(name string) bool { + if len(name) < 64 || !allowedDockerLayerIDPattern.MatchString(name[0:64]) { + return false + } + switch name[64:] { + case ".json", "/", "/json", "/layer.tar", "/VERSION": + return true + default: + return false + } +} + +// renameDockerImageTarStream renames a tar-stream containing a docker image +// in the `docker save` format. The tar-stream is renamed on-the-fly without +// any files being written to disk. +func renameDockerImageTarStream(imageName string, r io.Reader, w io.Writer) error { + // We call rewriteTarStream to rename the tar-stream on-the-fly + nameWritten := false // check that we found 'repositories' or 'manifest.json' + rwerr := rewriteTarStream(r, w, func(hdr *tar.Header, r io.Reader) (*tar.Header, io.Reader, error) { + // Whenever an entry is reached we switch on the name + switch hdr.Name { + case "repositories", "manifest.json": + nameWritten = true + data, err := ioutil.ReadAll(r) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to read the metadata file from docker image tar-stream") + } + switch hdr.Name { + case "repositories": + data, err = rewriteRepositories(imageName, data) + case "manifest.json": + data, err = rewriteManifest(imageName, data) + default: + panic(errors.New("unreachable")) + } + if err != nil { + return nil, nil, err + } + hdr.Size = int64(len(data)) + return hdr, bytes.NewReader(data), nil + default: + if !isAllowedDockerLayerFile(hdr.Name) { + return nil, nil, fmt.Errorf( + "docker image tar-ball contains illegal file: '%s', consider using an older version of docker", hdr.Name, + ) + } + return hdr, r, nil + } + }) + // If there was no error and name wasn't written at some point, then we have problem + if rwerr == nil && !nameWritten { + return fmt.Errorf( + "docker image tar-ball did contain 'manifest.json' or 'repositories', docker image is invalid", + ) + } + return rwerr +} + +// rewriteRepositories will rewrite the 'repositories' file from a docker image +// tar-stream such that the image has a single tag named :latest +func rewriteRepositories(imageName string, data []byte) ([]byte, error) { + // Rewrite the 'repositories' by reading it as JSON from the input stream + var repositories map[string]map[string]string + err := json.Unmarshal(data, &repositories) + if err != nil { + return nil, fmt.Errorf( + "the 'repositories' file in the docker image tar-ball is not valid JSON, error: %s", + err.Error(), + ) + } + // Find the layerID + var layerID string + for _, tags := range repositories { + for _, ID := range tags { + // if there are multiple tags pointing to different IDs that's problematic + if layerID == "" { + layerID = ID + } else if layerID != ID { + return nil, fmt.Errorf( + "the 'repositories' file in the docker image tar-ball contains multiple tags with different layer IDs", + ) + } + } + } + // If there is no imageID we have a problem + if layerID == "" { + return nil, fmt.Errorf( + "the 'repositories' file in the docker image tar-ball did not contain any layer identifiers, image is invalid", + ) + } + // Create result of the rewrite and return it + data, err = json.Marshal(map[string]map[string]string{ + imageName: {"latest": layerID}, // map from imageName to layerID + }) + if err != nil { + panic(errors.Wrap(err, "json.Marshal failed on map[string]map[string]string that can't happen")) + } + return data, nil +} + +// Type to read from manifest.json +type manifestEntry struct { + Config string `json:"Config,omitempty"` + RepoTags []string `json:"RepoTags"` + Layers []string `json:"Layers"` +} + +// rewriteManifest will rewrite the 'manifest.json' file from a docker image +// tar-stream such that the image has a single tag named :latest +func rewriteManifest(imageName string, data []byte) ([]byte, error) { + // Read the manifest.json from input + var manifest []manifestEntry + err := json.Unmarshal(data, &manifest) + if err != nil { + return nil, fmt.Errorf( + "the 'manifest.json' file in the docker image tar-ball is not valid JSON, error: %s", + err.Error(), + ) + } + if len(manifest) == 0 { + return nil, fmt.Errorf( + "the 'manifest.json' file in the docker image tar-ball did not contain any entries", + ) + } + if len(manifest) > 1 { + return nil, fmt.Errorf( + "the 'manifest.json' file in the docker image tar-ball contains more than one entry", + ) + } + // Rewrite the RepoTags and only the first entry as JSON for result + manifest[0].RepoTags = []string{fmt.Sprintf("%s:latest", imageName)} + data, err = json.Marshal([]manifestEntry{manifest[0]}) + if err != nil { + panic(errors.Wrap(err, "json.Marshal failed on []manifestEntry that can't happen")) + } + return data, nil +} diff --git a/dockerworker/renameimage_test.go b/dockerworker/renameimage_test.go new file mode 100644 index 00000000..90dec38e --- /dev/null +++ b/dockerworker/renameimage_test.go @@ -0,0 +1,33 @@ +// +build docker + +package dockerworker + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRewriteRepositories(t *testing.T) { + data, err := rewriteRepositories("my-image", []byte( + `{"busybox":{"latest":"374004614a75c2c4afd41a3050b5217e282155eb1eb7b4ce8f22aa9f4b17ee57"}}`, + )) + require.NoError(t, err) + require.Contains(t, string(data), "my-image") + require.NotContains(t, string(data), "busybox") +} + +func TestRewriteManifest(t *testing.T) { + data, err := rewriteManifest("my-image", []byte(`[ + { + "Config": "2b8fd9751c4c0f5dd266fcae00707e67a2545ef34f9a29354585f93dac906749.json", + "RepoTags": ["busybox:latest"], + "Layers": [ + "374004614a75c2c4afd41a3050b5217e282155eb1eb7b4ce8f22aa9f4b17ee57/layer.tar" + ] + } + ]`)) + require.NoError(t, err) + require.Contains(t, string(data), "my-image") + require.NotContains(t, string(data), "busybox") +} diff --git a/dockerworker/tarstream.go b/dockerworker/tarstream.go new file mode 100644 index 00000000..a24f3c72 --- /dev/null +++ b/dockerworker/tarstream.go @@ -0,0 +1,67 @@ +// +build docker + +package dockerworker + +import ( + "archive/tar" + "io" + + "github.com/pkg/errors" +) + +// a tarRewriter is a function that given a tar.Header and io.Reader returns a +// tar.Header and io.Reader to be written in place of the entry given. +// +// If a nil header is returned the entry is skipped, if no transformation is +// desired the header and reader given can be returned with any changes. +type tarRewriter func(header *tar.Header, r io.Reader) (*tar.Header, io.Reader, error) + +// rewriteTarStream rewrites a tar-stream read from r and written to w. +// +// For each entry in the tar-stream the rewriter function is called, if it +// returns an error then the process is aborted, if it returns a nil header the +// entry is skipped, otherwise the returned header and body is rewritten to the +// output tar-stream. Notice that rewriter can return the arguments given +// in-order to let entries pass-through. +func rewriteTarStream(r io.Reader, w io.Writer, rewriter tarRewriter) error { + // Create a tar.Reader and tar.Writer, so we can move files between the two + tr := tar.NewReader(r) + tw := tar.NewWriter(w) + for { + // Read a tar.Header + hdr, err := tr.Next() + if err == io.EOF { + break // we're done reading + } + if err != nil { + return errors.Wrap(err, "failed to read tar.Reader while rewriting tar-stream") + } + + // Allow the rewriter function to rewrite the entry + var hdr2 *tar.Header + var body io.Reader + hdr2, body, err = rewriter(hdr, tr) + if err != nil { + return err + } + if hdr2 == nil { + continue // skip this entry + } + + // Write tar.Header and copy the body without making any changes + err = tw.WriteHeader(hdr2) + if err != nil { + return errors.Wrap(err, "failed to write a tar.Header, while rewriting tar-stream") + } + // Copy file body to target as well + _, err = io.Copy(tw, body) + if err != nil { + return errors.Wrap(err, "failed to write to tar.Writer, while rewriting tar-stream") + } + } + // Close the tar.Writer, this ensure any cached bytes or outstanding errors are raised + if err := tw.Close(); err != nil { + return errors.Wrap(err, "failed to close tar.Writer") + } + return nil +} diff --git a/dockerworker/tarstream_test.go b/dockerworker/tarstream_test.go new file mode 100644 index 00000000..3d9a1644 --- /dev/null +++ b/dockerworker/tarstream_test.go @@ -0,0 +1,98 @@ +// +build docker + +package dockerworker + +import ( + "archive/tar" + "bytes" + "crypto/rand" + "errors" + "fmt" + "io" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRewriteTarStream(t *testing.T) { + // Create a big random blob + blob := make([]byte, 6*1024*1024) + _, err := rand.Read(blob) + require.NoError(t, err) + + // Create tar file + var input = []struct { + Name string + Data []byte + }{ + {"hello.txt", []byte("hello-world")}, + {"rewritten.txt", []byte("data-to-be-rewritten")}, + {"filtered.txt", blob}, + {"to-rename.txt", []byte("data-in-renamed-file")}, + {"blob.bin", blob}, + } + b := bytes.NewBuffer(nil) + tw := tar.NewWriter(b) + for _, f := range input { + err = tw.WriteHeader(&tar.Header{ + Name: f.Name, + Mode: 0600, + Size: int64(len(f.Data)), + }) + require.NoError(t, err) + _, err = tw.Write(f.Data) + require.NoError(t, err) + } + + // Rewrite the tar file as stream + out := bytes.NewBuffer(nil) + err = rewriteTarStream(b, out, func(hdr *tar.Header, r io.Reader) (*tar.Header, io.Reader, error) { + switch hdr.Name { + case "hello.txt": + return hdr, r, nil + case "rewritten.txt": + data := []byte("data-that-was-rewritten") + hdr.Size = int64(len(data)) + return hdr, bytes.NewReader(data), nil + case "filtered.txt": + return nil, nil, nil + case "to-rename.txt": + hdr.Name = "was-renamed.txt" + return hdr, r, nil + case "blob.bin": + return hdr, r, nil + default: + return nil, nil, errors.New("unhandled file name in test case") + } + }) + require.NoError(t, err) + + // Read the rewritten tar-stream + tr := tar.NewReader(out) + + // declare expected output + var output = []struct { + Name string + Data string + }{ + {"hello.txt", "hello-world"}, + {"rewritten.txt", "data-that-was-rewritten"}, + {"was-renamed.txt", "data-in-renamed-file"}, + {"blob.bin", string(blob)}, + } + for _, f := range output { + fmt.Printf(" - verify %s\n", f.Name) + var hdr *tar.Header + hdr, err = tr.Next() + require.NoError(t, err) + require.Equal(t, f.Name, hdr.Name) + var data []byte + data, err = ioutil.ReadAll(tr) + require.NoError(t, err) + require.Equal(t, f.Data, string(data)) + } + fmt.Println(" - verify that we have EOF") + _, err = tr.Next() + require.Equal(t, io.EOF, err) +} diff --git a/dockerworker/util.go b/dockerworker/util.go new file mode 100644 index 00000000..c4ae56c7 --- /dev/null +++ b/dockerworker/util.go @@ -0,0 +1,20 @@ +// +build docker + +package dockerworker + +import ( + "context" + "os" + "testing" + + docker "github.com/fsouza/go-dockerclient" + "github.com/stretchr/testify/require" + "github.com/taskcluster/taskcluster-client-go/tcqueue" +) + +// NewTestDockerWorker returns a new DockerWorker object for tests +func NewTestDockerWorker(t *testing.T) *DockerWorker { + cli, err := docker.NewClientFromEnv() + require.NoError(t, err) + return New(context.Background(), tcqueue.NewFromEnv(), cli, "testtaskid", os.Stdout) +} diff --git a/dockerworker/util_test.go b/dockerworker/util_test.go new file mode 100644 index 00000000..61c6d160 --- /dev/null +++ b/dockerworker/util_test.go @@ -0,0 +1,58 @@ +// +build docker + +package dockerworker + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/taskcluster/slugid-go/slugid" + tcclient "github.com/taskcluster/taskcluster-client-go" + "github.com/taskcluster/taskcluster-client-go/tcqueue" +) + +func createDummyArtifact(d *DockerWorker, name, content string) (taskID string, err error) { + task := tcqueue.TaskDefinitionRequest{ + Created: tcclient.Time(time.Now()), + Deadline: tcclient.Time(time.Now().Add(10 * time.Minute)), + Expires: tcclient.Time(time.Now().Add(48 * time.Hour)), + Payload: json.RawMessage("{}"), + Metadata: tcqueue.TaskMetadata{ + Description: "generic-worker dummy task", + Name: "generic-worker dummy task", + Owner: "wcosta@mozilla.com", + Source: "https://www.mozilla.org", + }, + Priority: "lowest", + ProvisionerID: "null-provisioner", + WorkerType: "docker-worker", + SchedulerID: "docker-worker-tests", + } + + taskID = slugid.Nice() + + if _, err = d.queue.CreateTask(taskID, &task); err != nil { + return + } + + fmt.Printf("Created task %s\n", taskID) + + claimResp, err := d.queue.ClaimTask(taskID, "0", &tcqueue.TaskClaimRequest{ + WorkerGroup: "docker-worker", + WorkerID: "docker-worker", + }) + + if err != nil { + return + } + + // We cannot call report completed without claiming the task first + defer d.queue.ReportCompleted(taskID, "0") + + d.ScheduleReclaim(claimResp) + + err = d.UploadArtifact(taskID, "0", name, strings.NewReader(content)) + return +} diff --git a/imageid_test.go b/imageid_test.go new file mode 100644 index 00000000..3aa35731 --- /dev/null +++ b/imageid_test.go @@ -0,0 +1,9 @@ +package main + +import "github.com/taskcluster/slugid-go/slugid" + +var ( + // all tests can share taskGroupId so we can view all test tasks in same + // graph later for troubleshooting + taskGroupID = slugid.Nice() +) diff --git a/intermittent_task_test.go b/intermittent_task_test.go index 4209db9b..d9af9f78 100644 --- a/intermittent_task_test.go +++ b/intermittent_task_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/main.go b/main.go index 0f5fe4fa..75f4f872 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,6 @@ //go:generate gw-codegen file://all-unix-style.yml generated_all-unix-style.go !windows //go:generate gw-codegen file://windows.yml generated_windows.go -//go:generate gw-codegen https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/payload.json dockerworker/payload.go +//go:generate gw-codegen https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json dockerworker/payload.go package main @@ -28,7 +28,6 @@ import ( "github.com/taskcluster/taskcluster-client-go/tcawsprovisioner" "github.com/taskcluster/taskcluster-client-go/tcpurgecache" "github.com/taskcluster/taskcluster-client-go/tcqueue" - "github.com/xeipuuv/gojsonschema" ) var ( @@ -809,75 +808,6 @@ func ClaimWork() *TaskRun { } } -func (task *TaskRun) validatePayload() *CommandExecutionError { - jsonPayload := task.Definition.Payload - log.Printf("JSON payload: %s", jsonPayload) - schemaLoader := gojsonschema.NewStringLoader(taskPayloadSchema()) - docLoader := gojsonschema.NewStringLoader(string(jsonPayload)) - result, err := gojsonschema.Validate(schemaLoader, docLoader) - if err != nil { - return MalformedPayloadError(err) - } - if !result.Valid() { - task.Error("TASK FAIL since the task payload is invalid. See errors:") - for _, desc := range result.Errors() { - task.Errorf("- %s", desc) - } - // Dealing with Invalid Task Payloads - // ---------------------------------- - // If the task payload is malformed or invalid, keep in mind that the - // queue doesn't validate the contents of the `task.payload` property, - // the worker may resolve the current run by reporting an exception. - // When reporting an exception, using `tcqueue.ReportException` the - // worker should give a `reason`. If the worker is unable execute the - // task specific payload/code/logic, it should report exception with - // the reason `malformed-payload`. - // - // This can also be used if an external resource that is referenced in - // a declarative nature doesn't exist. Generally, it should be used if - // we can be certain that another run of the task will have the same - // result. This differs from `tcqueue.ReportFailed` in the sense that we - // report a failure if the task specific code failed. - // - // Most tasks includes a lot of declarative steps, such as poll a - // docker image, create cache folder, decrypt encrypted environment - // variables, set environment variables and etc. Clearly, if decryption - // of environment variables fail, there is no reason to retry the task. - // Nor can it be said that the task failed, because the error wasn't - // cause by execution of Turing complete code. - // - // If however, we run some executable code referenced in `task.payload` - // and the code crashes or exists non-zero, then the task is said to be - // failed. The difference is whether or not the unexpected behavior - // happened before or after the execution of task specific Turing - // complete code. - return MalformedPayloadError(fmt.Errorf("Validation of payload failed for task %v", task.TaskID)) - } - err = json.Unmarshal(jsonPayload, &task.Payload) - if err != nil { - return MalformedPayloadError(err) - } - for _, artifact := range task.Payload.Artifacts { - // The default artifact expiry is task expiry, but is only applied when - // the task artifacts are resolved. We intentionally don't modify - // task.Payload otherwise it no longer reflects the real data defined - // in the task. - if !time.Time(artifact.Expires).IsZero() { - // Don't be too strict: allow 1s discrepancy to account for - // possible timestamp rounding on upstream systems - if time.Time(artifact.Expires).Add(time.Second).Before(time.Time(task.Definition.Deadline)) { - return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires before task deadline (%v is before %v)", artifact.Path, artifact.Expires, task.Definition.Deadline)) - } - // Don't be too strict: allow 1s discrepancy to account for - // possible timestamp rounding on upstream systems - if time.Time(artifact.Expires).After(time.Time(task.Definition.Expires).Add(time.Second)) { - return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires after task expiry (%v is after %v)", artifact.Path, artifact.Expires, task.Definition.Expires)) - } - } - } - return nil -} - type CommandExecutionError struct { TaskStatus TaskStatus Cause error diff --git a/main_test.go b/main_test.go index b512c001..6b6fa421 100644 --- a/main_test.go +++ b/main_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/mounts_test.go b/mounts_test.go index d56c968d..fb57eb76 100644 --- a/mounts_test.go +++ b/mounts_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/os_groups_all-unix-style_test.go b/os_groups_all-unix-style_test.go index f58db001..5d8f9578 100644 --- a/os_groups_all-unix-style_test.go +++ b/os_groups_all-unix-style_test.go @@ -1,4 +1,4 @@ -// +build !windows +// +build !windows,!docker package main diff --git a/os_groups_windows_test.go b/os_groups_windows_test.go index 99da5b2c..b41da91d 100644 --- a/os_groups_windows_test.go +++ b/os_groups_windows_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/payload_test.go b/payload_test.go index bf56cea9..bb58e744 100644 --- a/payload_test.go +++ b/payload_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/plat_all-unix-style.go b/plat_all-unix-style.go index be0abc97..7215bf4f 100644 --- a/plat_all-unix-style.go +++ b/plat_all-unix-style.go @@ -1,4 +1,4 @@ -// +build !windows +// +build !windows,!docker package main diff --git a/plat_darwin.go b/plat_darwin.go index c76555e9..0d9db925 100644 --- a/plat_darwin.go +++ b/plat_darwin.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/plat_docker.go b/plat_docker.go new file mode 100644 index 00000000..4a670017 --- /dev/null +++ b/plat_docker.go @@ -0,0 +1,153 @@ +// +build docker,!windows + +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "runtime" + "strconv" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/taskcluster/generic-worker/dockerworker" + "github.com/taskcluster/generic-worker/process" + "github.com/taskcluster/shell" +) + +var cli *docker.Client + +func (task *TaskRun) EnvVars() []string { + env := make([]string, 0, len(task.Payload.Env)) + + for key, value := range task.Payload.Env { + env = append(env, fmt.Sprintf("%s=%s", key, value)) + } + + return env +} + +func platformFeatures() []Feature { + return []Feature{} +} + +type PlatformData struct { + Image json.RawMessage +} + +type TaskContext struct { + TaskDir string +} + +func (task *TaskRun) NewPlatformData() (pd *PlatformData, err error) { + return &PlatformData{}, nil +} + +func (pd *PlatformData) ReleaseResources() error { + return nil +} + +func immediateShutdown(cause string) { +} + +func immediateReboot() { +} + +func init() { + var err error + if cli, err = docker.NewClientFromEnv(); err != nil { + panic(err) + } + + pwd, err := os.Getwd() + if err != nil { + panic(err) + } + + taskContext = &TaskContext{ + TaskDir: pwd, + } +} + +func (task *TaskRun) prepareCommand(index int) error { + return nil +} + +func (task *TaskRun) generateCommand(index int) error { + var err error + task.Commands[index], err = process.NewCommand( + dockerworker.New(context.Background(), task.Queue, cli, task.TaskID, task.logWriter), + task.Payload.Command[index], + task.PlatformData.Image, + taskContext.TaskDir, + task.EnvVars(), + ) + return err +} + +func purgeOldTasks() error { + return nil +} + +func install(arguments map[string]interface{}) (err error) { + return nil +} + +func makeDirReadableForTaskUser(task *TaskRun, dir string) error { + // No user separation yet + return nil +} + +func makeDirUnreadableForTaskUser(task *TaskRun, dir string) error { + // No user separation yet + return nil +} + +func RenameCrossDevice(oldpath, newpath string) error { + return nil +} + +func (task *TaskRun) formatCommand(index int) string { + return shell.Escape(task.Payload.Command[index]...) +} + +func prepareTaskUser(username string) bool { + return false +} + +func deleteTaskDir(path string) error { + return nil +} + +func defaultTasksDir() string { + // assume all user home directories are all in same folder, i.e. the parent + // folder of the current user's home folder + return filepath.Dir(os.Getenv("HOME")) +} + +// N/A for unix - just a windows thing +func AutoLogonCredentials() (string, string) { + return "", "" +} + +func chooseTaskDirName() string { + return "task_" + strconv.Itoa(int(time.Now().Unix())) +} + +func unsetAutoLogon() { +} + +func deleteTaskDirs() { + //removeTaskDirs(config.TasksDir) +} + +func GrantSIDFullControlOfInteractiveWindowsStationAndDesktop(sid string) (err error) { + return fmt.Errorf( + "Cannot grant %v full control of interactive windows station and desktop; platform %v does not have such entities", + sid, + runtime.GOOS, + ) +} diff --git a/plat_windows.go b/plat_windows.go index 25839b70..7c4dde05 100644 --- a/plat_windows.go +++ b/plat_windows.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/process/logininfo_windows.go b/process/logininfo_windows.go index f70464b0..0336dd8d 100644 --- a/process/logininfo_windows.go +++ b/process/logininfo_windows.go @@ -1,3 +1,5 @@ +// +build !docker + package process import ( diff --git a/process/process_darwin.go b/process/process_all-unix-style.go similarity index 99% rename from process/process_darwin.go rename to process/process_all-unix-style.go index 3e55e90c..d4ab882a 100644 --- a/process/process_darwin.go +++ b/process/process_all-unix-style.go @@ -1,3 +1,5 @@ +// +build !windows,!docker + package process import ( diff --git a/process/process_docker.go b/process/process_docker.go new file mode 100644 index 00000000..d99a7921 --- /dev/null +++ b/process/process_docker.go @@ -0,0 +1,141 @@ +// +build docker,!windows + +package process + +import ( + "encoding/json" + "fmt" + "io" + "os" + "sync" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/taskcluster/generic-worker/dockerworker" +) + +type Result struct { + SystemError error + exitCode int + Duration time.Duration +} + +type Command struct { + mutex sync.RWMutex + worker *dockerworker.DockerWorker + writer io.Writer + cmd []string + workingDirectory string + image json.RawMessage + env []string +} + +func (c *Command) ensureImage() (img *docker.Image, err error) { + var imageName string + var imageArtifact dockerworker.DockerImageArtifact + + if err = json.Unmarshal(c.image, &imageName); err == nil { + img, err = c.worker.LoadImage(imageName) + } else if err = json.Unmarshal(c.image, &imageArtifact); err == nil { + img, err = c.worker.LoadArtifactImage(imageArtifact.TaskID, "", imageArtifact.Path) + } + + return +} + +func (c *Command) DirectOutput(writer io.Writer) { + c.writer = writer +} + +func (c *Command) String() string { + return fmt.Sprintf("%q", c.cmd) +} + +func (c *Command) Execute() (r *Result) { + r = &Result{} + + c.mutex.Lock() + defer c.mutex.Unlock() + + image, err := c.ensureImage() + if err != nil { + r.SystemError = fmt.Errorf("Error downloading image: %v", err) + return + } + + container, err := c.worker.CreateContainer(c.env, image, c.cmd, false) + if err != nil { + r.SystemError = fmt.Errorf("Error creating a new container: %v", err) + return + } + + if err = c.worker.Client.StartContainerWithContext(container.ID, nil, c.worker.Context); err != nil { + r.SystemError = fmt.Errorf("Error starting container: %v", err) + return + } + + started := time.Now() + if r.exitCode, err = c.worker.Client.WaitContainer(container.ID); err != nil { + r.SystemError = fmt.Errorf("Error wating for container to finish: %v", err) + return + } + + err = c.worker.Client.Logs(docker.LogsOptions{ + Context: c.worker.Context, + Container: container.ID, + OutputStream: c.worker.LivelogWriter, + ErrorStream: c.worker.LivelogWriter, + Stdout: true, + Stderr: true, + RawTerminal: true, + Timestamps: true, + }) + if err != nil { + r.SystemError = fmt.Errorf("Error pulling container logs: %v", err) + } + + finished := time.Now() + r.Duration = finished.Sub(started) + + return +} + +func (r *Result) CrashCause() error { + return r.SystemError +} + +func (r *Result) Crashed() bool { + return r.SystemError != nil +} + +func (r *Result) FailureCause() error { + return fmt.Errorf("Exit code %v", r.exitCode) +} + +func (r *Result) Failed() bool { + return r.exitCode != 0 +} + +func (r *Result) ExitCode() int { + if r.SystemError != nil { + return -2 + } + + return r.exitCode +} + +func NewCommand(worker *dockerworker.DockerWorker, commandLine []string, image json.RawMessage, workingDirectory string, env []string) (*Command, error) { + c := &Command{ + worker: worker, + writer: os.Stdout, + cmd: commandLine, + workingDirectory: workingDirectory, + image: image, + env: env, + } + return c, nil +} + +func (c *Command) Kill() ([]byte, error) { + return []byte{}, nil +} diff --git a/process/process_docker_test.go b/process/process_docker_test.go new file mode 100644 index 00000000..191510fb --- /dev/null +++ b/process/process_docker_test.go @@ -0,0 +1,52 @@ +// +build docker + +package process + +import ( + "encoding/json" + "fmt" + "io" + "strings" + "testing" + + docker "github.com/fsouza/go-dockerclient" + "github.com/mattetti/filebuffer" + "github.com/stretchr/testify/require" + "github.com/taskcluster/generic-worker/dockerworker" +) + +func TestProcess(t *testing.T) { + const message = "Would you kindly?" + const imageName = "ubuntu:14.04" + + image, err := json.Marshal(imageName) + require.NoError(t, err) + + d := dockerworker.NewTestDockerWorker(t) + if _, err = d.Client.InspectImage(imageName); err == nil { + require.NoError(t, d.Client.RemoveImageExtended(imageName, docker.RemoveImageOptions{ + Force: true, + Context: d.Context, + })) + } + buff := filebuffer.New([]byte{}) + d.LivelogWriter = buff + + c, err := NewCommand( + d, + []string{"/bin/bash", "-c", "echo $MESSAGE"}, + image, + "/", + []string{fmt.Sprintf("MESSAGE='%s'", message)}, + ) + require.NoError(t, err) + + r := c.Execute() + require.NoError(t, r.SystemError) + require.Equal(t, r.ExitCode(), 0) + + _, err = buff.Seek(0, io.SeekStart) + require.NoError(t, err) + + require.True(t, strings.Contains(buff.String(), message)) +} diff --git a/process/process_linux.go b/process/process_linux.go deleted file mode 100644 index abb7aa3f..00000000 --- a/process/process_linux.go +++ /dev/null @@ -1,139 +0,0 @@ -package process - -import ( - "fmt" - "io" - "os" - "sync" - "time" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/client" - "golang.org/x/net/context" -) - -type Result struct { - SystemError error - ExitCode int64 - Duration time.Duration -} - -type Command struct { - mutex sync.RWMutex - ctx context.Context - cli *client.Client - resp container.ContainerCreateCreatedBody - writer io.Writer - cmd []string - workingDirectory string - env []string -} - -var cli *client.Client - -func init() { - var err error - cli, err = client.NewClientWithOpts(client.WithVersion("1.24")) - if err != nil { - panic(err) - } -} - -func (c *Command) DirectOutput(writer io.Writer) { - c.writer = writer -} - -func (c *Command) String() string { - return fmt.Sprintf("%q", c.cmd) -} - -func (c *Command) Execute() (r *Result) { - - var outPull io.ReadCloser - r = &Result{} - c.mutex.Lock() - defer c.mutex.Unlock() - outPull, r.SystemError = cli.ImagePull(c.ctx, "ubuntu", types.ImagePullOptions{}) - if r.SystemError != nil { - return - } - defer outPull.Close() - io.Copy(c.writer, outPull) - - c.resp, r.SystemError = c.cli.ContainerCreate( - c.ctx, - &container.Config{ - Image: "ubuntu", - Cmd: c.cmd, - WorkingDir: c.workingDirectory, - Env: c.env, - }, - nil, - nil, - "", - ) - if r.SystemError != nil { - return - } - - r.SystemError = c.cli.ContainerStart(c.ctx, c.resp.ID, types.ContainerStartOptions{}) - if r.SystemError != nil { - return - } - - started := time.Now() - res, errch := c.cli.ContainerWait(c.ctx, c.resp.ID, container.WaitConditionNotRunning) - select { - case r.SystemError = <-errch: - if r.SystemError != nil { - return - } - case exitCode := <-res: - r.ExitCode = exitCode.StatusCode - } - - var outLogs io.ReadCloser - outLogs, r.SystemError = c.cli.ContainerLogs(c.ctx, c.resp.ID, types.ContainerLogsOptions{ShowStdout: true}) - if r.SystemError != nil { - return - } - defer outLogs.Close() - io.Copy(c.writer, outLogs) - - finished := time.Now() - r.Duration = finished.Sub(started) - return -} - -func (r *Result) CrashCause() error { - return r.SystemError -} - -func (r *Result) Crashed() bool { - return r.SystemError != nil -} - -func (r *Result) FailureCause() error { - return fmt.Errorf("Exit code %v", r.ExitCode) -} - -func (r *Result) Failed() bool { - return r.ExitCode != 0 -} - -func NewCommand(commandLine []string, workingDirectory string, env []string) (*Command, error) { - c := &Command{ - ctx: context.Background(), - writer: os.Stdout, - cmd: commandLine, - workingDirectory: workingDirectory, - cli: cli, - env: env, - } - return c, nil -} - -func (c *Command) Kill() error { - return nil -} diff --git a/process/process_windows.go b/process/process_windows.go index ea9a52dc..b184969e 100644 --- a/process/process_windows.go +++ b/process/process_windows.go @@ -1,3 +1,5 @@ +// +build !docker + package process import ( diff --git a/supersede_test.go b/supersede_test.go index 4f6b9eba..e6ce2765 100644 --- a/supersede_test.go +++ b/supersede_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/taskcluster_proxy_test.go b/taskcluster_proxy_test.go index 71b533ab..4b749abd 100644 --- a/taskcluster_proxy_test.go +++ b/taskcluster_proxy_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/taskstatus_test.go b/taskstatus_test.go index 5b3225df..4553e021 100644 --- a/taskstatus_test.go +++ b/taskstatus_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/validate.go b/validate.go new file mode 100644 index 00000000..5e325f55 --- /dev/null +++ b/validate.go @@ -0,0 +1,81 @@ +// +build !docker + +package main + +import ( + "encoding/json" + "fmt" + "log" + "time" + + "github.com/xeipuuv/gojsonschema" +) + +func (task *TaskRun) validatePayload() *CommandExecutionError { + jsonPayload := task.Definition.Payload + log.Printf("JSON payload: %s", jsonPayload) + schemaLoader := gojsonschema.NewStringLoader(taskPayloadSchema()) + docLoader := gojsonschema.NewStringLoader(string(jsonPayload)) + result, err := gojsonschema.Validate(schemaLoader, docLoader) + if err != nil { + return MalformedPayloadError(err) + } + if !result.Valid() { + task.Error("TASK FAIL since the task payload is invalid. See errors:") + for _, desc := range result.Errors() { + task.Errorf("- %s", desc) + } + // Dealing with Invalid Task Payloads + // ---------------------------------- + // If the task payload is malformed or invalid, keep in mind that the + // queue doesn't validate the contents of the `task.payload` property, + // the worker may resolve the current run by reporting an exception. + // When reporting an exception, using `tcqueue.ReportException` the + // worker should give a `reason`. If the worker is unable execute the + // task specific payload/code/logic, it should report exception with + // the reason `malformed-payload`. + // + // This can also be used if an external resource that is referenced in + // a declarative nature doesn't exist. Generally, it should be used if + // we can be certain that another run of the task will have the same + // result. This differs from `tcqueue.ReportFailed` in the sense that we + // report a failure if the task specific code failed. + // + // Most tasks includes a lot of declarative steps, such as poll a + // docker image, create cache folder, decrypt encrypted environment + // variables, set environment variables and etc. Clearly, if decryption + // of environment variables fail, there is no reason to retry the task. + // Nor can it be said that the task failed, because the error wasn't + // cause by execution of Turing complete code. + // + // If however, we run some executable code referenced in `task.payload` + // and the code crashes or exists non-zero, then the task is said to be + // failed. The difference is whether or not the unexpected behavior + // happened before or after the execution of task specific Turing + // complete code. + return MalformedPayloadError(fmt.Errorf("Validation of payload failed for task %v", task.TaskID)) + } + err = json.Unmarshal(jsonPayload, &task.Payload) + if err != nil { + return MalformedPayloadError(err) + } + for _, artifact := range task.Payload.Artifacts { + // The default artifact expiry is task expiry, but is only applied when + // the task artifacts are resolved. We intentionally don't modify + // task.Payload otherwise it no longer reflects the real data defined + // in the task. + if !time.Time(artifact.Expires).IsZero() { + // Don't be too strict: allow 1s discrepancy to account for + // possible timestamp rounding on upstream systems + if time.Time(artifact.Expires).Add(time.Second).Before(time.Time(task.Definition.Deadline)) { + return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires before task deadline (%v is before %v)", artifact.Path, artifact.Expires, task.Definition.Deadline)) + } + // Don't be too strict: allow 1s discrepancy to account for + // possible timestamp rounding on upstream systems + if time.Time(artifact.Expires).After(time.Time(task.Definition.Expires).Add(time.Second)) { + return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires after task expiry (%v is after %v)", artifact.Path, artifact.Expires, task.Definition.Expires)) + } + } + } + return nil +} diff --git a/validate_docker.go b/validate_docker.go new file mode 100644 index 00000000..6ba2d294 --- /dev/null +++ b/validate_docker.go @@ -0,0 +1,58 @@ +// +build docker + +package main + +import ( + "encoding/json" + "fmt" + + "github.com/taskcluster/generic-worker/dockerworker" +) + +func dockerWorkerToGenericWorkerPayload(dw *dockerworker.DockerWorkerPayload) *GenericWorkerPayload { + artifacts := make([]Artifact, 0, len(dw.Artifacts)) + for name, artifact := range dw.Artifacts { + artifacts = append(artifacts, Artifact{ + Expires: artifact.Expires, + Name: name, + Path: artifact.Path, + }) + } + + return &GenericWorkerPayload{ + Artifacts: artifacts, + Command: [][]string{dw.Command}, + Env: dw.Env, + MaxRunTime: int64(dw.MaxRunTime), + SupersederURL: dw.SupersederURL, + Features: FeatureFlags{}, + Mounts: []json.RawMessage{}, + OnExitStatus: ExitCodeHandling{}, + OSGroups: []string{}, + } +} + +func (task *TaskRun) validatePayload() *CommandExecutionError { + result, err := dockerworker.ValidatePayload(task.Definition.Payload) + if err != nil { + return MalformedPayloadError(err) + } + + if !result.Valid() { + task.Error("TASK FAIL since the task payload is invalid. See errors:") + for _, desc := range result.Errors() { + task.Errorf("- %s", desc) + } + return MalformedPayloadError(fmt.Errorf("Validation of payload failed for task %v", task.TaskID)) + } + + var payload dockerworker.DockerWorkerPayload + if err = json.Unmarshal(task.Definition.Payload, &payload); err != nil { + return MalformedPayloadError(err) + } + + task.PlatformData.Image = payload.Image + task.Payload = *dockerWorkerToGenericWorkerPayload(&payload) + + return nil +}