diff --git a/.gitignore b/.gitignore index 6ce29864..8f466748 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,6 @@ directory-caches.json file-caches.json tasks-resolved-count.txt /revision.txt + +# binary file +generic-worker diff --git a/.taskcluster.yml b/.taskcluster.yml index 522e04d9..155d9f68 100644 --- a/.taskcluster.yml +++ b/.taskcluster.yml @@ -70,8 +70,8 @@ tasks: maxRunTime: 3600 command: - set CGO_ENABLED=0 - - set GOPATH=%CD%\gopath1.10.3 - - set GOROOT=%CD%\go1.10.3\go + - set GOPATH=%CD%\gopath1.10.4 + - set GOROOT=%CD%\go1.10.4\go - set PATH=%CD%\git\cmd;%GOPATH%\bin;%GOROOT%\bin;%PATH% - git config --global core.autocrlf false - go version @@ -111,16 +111,16 @@ tasks: - ineffassign . artifacts: - name: public/build/generic-worker-windows-amd64.exe - path: gopath1.10.3\bin\generic-worker.exe + path: gopath1.10.4\bin\generic-worker.exe expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3\src + directory: gopath1.10.4\src - content: - url: https://storage.googleapis.com/golang/go1.10.3.windows-amd64.zip - sha256: a3f19d4fc0f4b45836b349503e347e64e31ab830dedac2fc9c390836d4418edb - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.windows-amd64.zip + sha256: 5499aa98399664df8dc1da5c3aaaed14b3130b79c713b5677a0ee9e93854476c + directory: go1.10.4 format: zip - content: url: https://github.com/git-for-windows/git/releases/download/v2.14.1.windows.1/MinGit-2.14.1-64-bit.zip @@ -152,8 +152,8 @@ tasks: maxRunTime: 3600 command: - set CGO_ENABLED=0 - - set GOPATH=%CD%\gopath1.10.3 - - set GOROOT=%CD%\go1.10.3\go + - set GOPATH=%CD%\gopath1.10.4 + - set GOROOT=%CD%\go1.10.4\go - set PATH=%CD%\git\bin;%GOPATH%\bin;%GOROOT%\bin;%PATH% - git config --global core.autocrlf false - go version @@ -182,16 +182,16 @@ tasks: - ineffassign . artifacts: - name: public/build/generic-worker-windows-386.exe - path: gopath1.10.3\bin\generic-worker.exe + path: gopath1.10.4\bin\generic-worker.exe expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3\src + directory: gopath1.10.4\src - content: - url: https://storage.googleapis.com/golang/go1.10.3.windows-386.zip - sha256: 89696a29bdf808fa9861216a21824ae8eb2e750a54b1424ce7f2a177e5cd1466 - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.windows-386.zip + sha256: 407e5619048c427de4a65b26edb17d54c220f8c30ebd358961b1785a38394ec9 + directory: go1.10.4 format: zip - content: url: https://github.com/git-for-windows/git/releases/download/v2.11.0.windows.3/Git-2.11.0.3-32-bit.tar.bz2 @@ -223,8 +223,8 @@ tasks: maxRunTime: 3600 command: - set CGO_ENABLED=0 - - set GOPATH=%CD%\gopath1.10.3 - - set GOROOT=%CD%\go1.10.3\go + - set GOPATH=%CD%\gopath1.10.4 + - set GOROOT=%CD%\go1.10.4\go - set PATH=%CD%\git\cmd;%GOPATH%\bin;%GOROOT%\bin;%PATH% - git config --global core.autocrlf false - go version @@ -264,16 +264,16 @@ tasks: - ineffassign . artifacts: - name: public/build/generic-worker-windows-amd64.exe - path: gopath1.10.3\bin\generic-worker.exe + path: gopath1.10.4\bin\generic-worker.exe expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3\src + directory: gopath1.10.4\src - content: - url: https://storage.googleapis.com/golang/go1.10.3.windows-amd64.zip - sha256: a3f19d4fc0f4b45836b349503e347e64e31ab830dedac2fc9c390836d4418edb - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.windows-amd64.zip + sha256: 5499aa98399664df8dc1da5c3aaaed14b3130b79c713b5677a0ee9e93854476c + directory: go1.10.4 format: zip - content: url: https://github.com/git-for-windows/git/releases/download/v2.14.1.windows.1/MinGit-2.14.1-64-bit.zip @@ -308,8 +308,8 @@ tasks: - -vxec - | export CGO_ENABLED=0 - export GOROOT="$(pwd)/go1.10.3/go" - export GOPATH="$(pwd)/gopath1.10.3" + export GOROOT="$(pwd)/go1.10.4/go" + export GOPATH="$(pwd)/gopath1.10.4" export PATH="${GOPATH}/bin:${GOROOT}/bin:${PATH}" go version go env @@ -335,16 +335,16 @@ tasks: ineffassign . artifacts: - name: public/build/generic-worker-darwin-amd64 - path: gopath1.10.3/bin/generic-worker + path: gopath1.10.4/bin/generic-worker expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.3/src + directory: gopath1.10.4/src - content: - url: https://storage.googleapis.com/golang/go1.10.3.darwin-amd64.tar.gz - sha256: 131fd430350a3134d352ee75c5ca456cdf4443e492d0527a9651c7c04e2b458d - directory: go1.10.3 + url: https://storage.googleapis.com/golang/go1.10.4.darwin-amd64.tar.gz + sha256: 2ba324f01de2b2ece0376f6d696570a4c5c13db67d00aadfd612adc56feff587 + directory: go1.10.4 format: tar.gz @@ -380,8 +380,8 @@ tasks: # - -vxec # - | # export CGO_ENABLED=0 - # export GOROOT="$(pwd)/go1.10.3/go" - # export GOPATH="$(pwd)/gopath1.10.3" + # export GOROOT="$(pwd)/go1.10.4/go" + # export GOPATH="$(pwd)/gopath1.10.4" # export PATH="${GOPATH}/bin:${GOROOT}/bin:${PATH}" # export CGO_ENABLED=0 # go version @@ -407,16 +407,16 @@ tasks: # ineffassign . # artifacts: # - name: public/build/generic-worker-linux-armv6l - # path: gopath1.10.3/bin/generic-worker + # path: gopath1.10.4/bin/generic-worker # expires: "{{ '2 weeks' | $fromNow }}" # type: file # mounts: # - cacheName: generic-worker-checkout - # directory: gopath1.10.3/src + # directory: gopath1.10.4/src # - content: - # url: https://storage.googleapis.com/golang/go1.10.3.linux-armv6l.tar.gz + # url: https://storage.googleapis.com/golang/go1.10.4.linux-armv6l.tar.gz # sha256: d3df3fa3d153e81041af24f31a82f86a21cb7b92c1b5552fb621bad0320f06b6 - # directory: go1.10.3 + # directory: go1.10.4 # format: tar.gz @@ -472,13 +472,13 @@ tasks: "${GOPATH}/bin/ineffassign" . artifacts: - name: public/build/generic-worker-linux-amd64 - path: gopath1.10.1/bin/generic-worker + path: gopath1.10.4/bin/generic-worker expires: "{{ '2 weeks' | $fromNow }}" type: file mounts: - cacheName: generic-worker-checkout - directory: gopath1.10.1/src + directory: gopath1.10.4/src - content: - url: https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz - directory: go1.10.1 + url: https://storage.googleapis.com/golang/go1.10.4.linux-amd64.tar.gz + directory: go1.10.4 format: tar.gz diff --git a/.travis.yml b/.travis.yml index 656cca7c..b659a8b9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go go: - - "1.10.3" + - "1.10.4" env: - "CGO_ENABLED=0 GIMME_OS=linux GIMME_ARCH=386" diff --git a/artifacts_test.go b/artifacts_test.go index 0dd27ee5..7331d4b1 100644 --- a/artifacts_test.go +++ b/artifacts_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( @@ -13,17 +15,10 @@ import ( "golang.org/x/crypto/openpgp" "golang.org/x/crypto/openpgp/clearsign" - "github.com/taskcluster/slugid-go/slugid" tcclient "github.com/taskcluster/taskcluster-client-go" "github.com/taskcluster/taskcluster-client-go/tcqueue" ) -var ( - // all tests can share taskGroupId so we can view all test tasks in same - // graph later for troubleshooting - taskGroupID = slugid.Nice() -) - func validateArtifacts( t *testing.T, payloadArtifacts []Artifact, diff --git a/aws_test.go b/aws_test.go index 4fb21e8a..828892ee 100644 --- a/aws_test.go +++ b/aws_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/chain_of_trust_all-unix-style.go b/chain_of_trust_all-unix-style.go index ee371da3..8a5fa376 100644 --- a/chain_of_trust_all-unix-style.go +++ b/chain_of_trust_all-unix-style.go @@ -1,4 +1,4 @@ -// +build !windows +// +build !windows,!docker package main diff --git a/chain_of_trust_docker.go b/chain_of_trust_docker.go new file mode 100644 index 00000000..c0c20da7 --- /dev/null +++ b/chain_of_trust_docker.go @@ -0,0 +1,11 @@ +// +build docker + +package main + +func (cot *ChainOfTrustTaskFeature) ensureTaskUserCantReadPrivateCotKey() error { + return nil +} + +func secureSigningKey() error { + return nil +} diff --git a/docker_linux.go b/docker_linux.go deleted file mode 100644 index b206cc2c..00000000 --- a/docker_linux.go +++ /dev/null @@ -1,46 +0,0 @@ -package main - -import ( - "github.com/taskcluster/taskcluster-base-go/scopes" -) - -type DockerFeature struct { -} - -func (feature *DockerFeature) Name() string { - return "Docker" -} - -func (feature *DockerFeature) Initialise() error { - return nil -} - -func (feature *DockerFeature) PersistState() error { - return nil -} - -func (feature *DockerFeature) IsEnabled(task *TaskRun) bool { - return true -} - -type DockerTask struct { - task *TaskRun -} - -func (feature *DockerFeature) NewTaskFeature(task *TaskRun) TaskFeature { - return &DockerTask{ - task: task, - } -} - -func (l *DockerTask) RequiredScopes() scopes.Required { - return scopes.Required{} -} - -func (l *DockerTask) Start() *CommandExecutionError { - return nil -} - -func (l *DockerTask) Stop() *CommandExecutionError { - return nil -} diff --git a/dockerworker/artifact.go b/dockerworker/artifact.go new file mode 100644 index 00000000..70c6624d --- /dev/null +++ b/dockerworker/artifact.go @@ -0,0 +1,151 @@ +// +build docker + +package dockerworker + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" + "strconv" + "time" + + "github.com/cenkalti/backoff" + "github.com/mattetti/filebuffer" + "github.com/mitchellh/ioprogress" + "github.com/taskcluster/taskcluster-client-go/tcqueue" + artifact "github.com/taskcluster/taskcluster-lib-artifact-go" +) + +// ScheduleReclaim run a goroutine that will perform reclaims in the background. +func (d *DockerWorker) ScheduleReclaim(claim *tcqueue.TaskClaimResponse) { + takenUntil := claim.TakenUntil + + go func() { + for { + select { + case <-time.After(time.Time(takenUntil).Sub(time.Now())): + reclaim, err := d.queue.ReclaimTask(claim.Status.TaskID, fmt.Sprint(claim.RunID)) + if err != nil { + d.TaskLogger.Printf("Exiting reclaim loop: %v", err) + return + } + takenUntil = reclaim.TakenUntil + case <-d.Context.Done(): + return + } + } + }() +} + +// DownloadArtifact downloads an artifact using exponential backoff algorithm +func (d *DockerWorker) DownloadArtifact(taskID, runID, name string, out io.WriteSeeker) (ret error) { + var u *url.URL + + backoffError := backoff.Retry(func() (err error) { + // rewind out stream + _, ret = out.Seek(0, io.SeekStart) + if ret != nil { + return + } + + if runID == "" { + u, ret = d.queue.GetLatestArtifact_SignedURL(taskID, name, 24*time.Hour) + } else { + u, ret = d.queue.GetArtifact_SignedURL(taskID, runID, name, 24*time.Hour) + } + + if ret != nil { + return + } + + d.TaskLogger.Printf("Downloading %s/%s/%s from %s\n", taskID, runID, name, u.String()) + + // Build a custom request object so we can embed a context into it + var req *http.Request + req, ret = http.NewRequest(http.MethodGet, u.String(), nil) + if ret != nil { + return + } + req = req.WithContext(d.Context) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + fmt.Println(err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + buff := filebuffer.New([]byte{}) + + // Build the error message with the status code and the response body + errorMessage := fmt.Sprintf("Error downloading %s/%s, status=%s", taskID, name, resp.Status) + if _, err2 := io.Copy(buff, resp.Body); err2 == nil { + errorMessage += "\n" + buff.String() + } + + // For status codes other than 5XX there is no point in issuing a retry + if resp.StatusCode >= 300 && resp.StatusCode < 500 { + ret = errors.New(errorMessage) + } else { + err = errors.New(errorMessage) + } + + return + } + + size, err2 := strconv.Atoi(resp.Header.Get("Content-Length")) + if err2 != nil { + ret = err2 + return + } + + // Depending on the implementation, the body can also issue network requests, + // that's why we need to retry if io.Copy fails + _, err = io.Copy(out, &ioprogress.Reader{ + Reader: resp.Body, + Size: int64(size), + DrawFunc: ioprogress.DrawTerminal(d.LivelogWriter), + }) + + return + }, backoff.WithMaxRetries(backoff.WithContext(backoff.NewExponentialBackOff(), d.Context), 3)) + + if ret == nil { + ret = backoffError + } + + return +} + +// UploadArtifact uploads a new artifact to the task +func (d *DockerWorker) UploadArtifact(taskID, runID, name string, in io.ReadSeeker) (ret error) { + client := artifact.New(d.queue) + + backoffErr := backoff.Retry(func() error { + if _, err := in.Seek(0, io.SeekStart); err != nil { + ret = err + return nil + } + + f, err := ioutil.TempFile(os.TempDir(), "gw") + if err != nil { + return err + } + + defer os.Remove(f.Name()) + defer f.Close() + + return client.Upload(taskID, runID, name, in, f, false, false) + }, backoff.WithMaxRetries(backoff.WithContext(backoff.NewExponentialBackOff(), d.Context), 3)) + + if ret == nil { + ret = backoffErr + } + + return +} diff --git a/dockerworker/artifact_test.go b/dockerworker/artifact_test.go new file mode 100644 index 00000000..c3b2931e --- /dev/null +++ b/dockerworker/artifact_test.go @@ -0,0 +1,36 @@ +// +build docker + +package dockerworker + +import ( + "io" + "testing" + + "github.com/mattetti/filebuffer" + "github.com/stretchr/testify/require" +) + +func TestArtifact(t *testing.T) { + d := NewTestDockerWorker(t) + + content := "This is a dummy artifact" + + taskID, err := createDummyArtifact(d, "public/test", content) + require.NoError(t, err) + + // Test Download Latest + buff := filebuffer.New([]byte{}) + require.NoError(t, d.DownloadArtifact(taskID, "", "public/test", buff)) + + _, err = buff.Seek(0, io.SeekStart) + require.NoError(t, err) + require.Equal(t, content, buff.String()) + + // Test Download specific runID + buff = filebuffer.New([]byte{}) + require.NoError(t, d.DownloadArtifact(taskID, "0", "public/test", buff)) + + _, err = buff.Seek(0, io.SeekStart) + require.NoError(t, err) + require.Equal(t, content, buff.String()) +} diff --git a/dockerworker/dockerimage.go b/dockerworker/dockerimage.go new file mode 100644 index 00000000..aab81ba0 --- /dev/null +++ b/dockerworker/dockerimage.go @@ -0,0 +1,145 @@ +// +build docker + +package dockerworker + +import ( + "context" + "encoding/base64" + "fmt" + "io" + "io/ioutil" + "os" + "strings" + + "github.com/DataDog/zstd" + "github.com/cenkalti/backoff" + docker "github.com/fsouza/go-dockerclient" + "github.com/pierrec/lz4" + "github.com/taskcluster/taskcluster-client-go/tcindex" +) + +func MakeImageName(taskID, runID, artifactName string) string { + return strings.ToLower(base64.RawStdEncoding.EncodeToString([]byte(taskID + runID + artifactName))) +} + +// Ensure image pulls the given image if it doesn't exist +func EnsureImage(ctx context.Context, cli *docker.Client, imageName string, log io.Writer) (img *docker.Image, err error) { + if img, err = cli.InspectImage(imageName); err == nil { + return + } + + err = backoff.Retry(func() error { + return cli.PullImage(docker.PullImageOptions{ + Repository: imageName, + OutputStream: log, + Context: ctx, + }, docker.AuthConfiguration{}) + }, backoff.WithMaxRetries(backoff.WithContext(backoff.NewExponentialBackOff(), ctx), 3)) + + if err != nil { + return + } + + return cli.InspectImage(imageName) +} + +// LoadArtifactImage loads an image from a task artifact +func (d *DockerWorker) LoadArtifactImage(taskID, runID, name string) (img *docker.Image, err error) { + if runID != "" { + d.TaskLogger.Printf("Loading image %s/%s/%s", taskID, runID, name) + } else { + d.TaskLogger.Printf("Loading image %s/%s", taskID, name) + } + + imageName := MakeImageName(taskID, runID, name) + + // If we already downloaded the image, just return it + img, err = d.Client.InspectImage(imageName) + if err == nil { + return + } + + // The downloaded image is written to a file because it might be + // a huge image + f, err := ioutil.TempFile(os.TempDir(), "image") + if err != nil { + return + } + + defer os.Remove(f.Name()) + defer f.Close() + + if err = d.DownloadArtifact(taskID, runID, name, f); err != nil { + return + } + + // rewind the file pointer to read the downloaded file + if _, err = f.Seek(0, io.SeekStart); err != nil { + return + } + + var r io.Reader + + // We accept the image in three formats: .tar, .tar.lz4 and .tar.zst + switch { + case strings.HasSuffix(name, ".zst"): + rc := zstd.NewReader(f) + defer rc.Close() + r = rc + case strings.HasSuffix(name, ".lz4"): + r = lz4.NewReader(f) + case strings.HasSuffix(name, ".tar"): + r = f + default: + err = fmt.Errorf("Not supported format for image artifact %s", name) + return + } + + // We need to rename the image name to avoid cache poisoning. + // Again we store it in a temporary file to avoid exhausting + // the memory + t, err := ioutil.TempFile(os.TempDir(), "image.tar") + if err != nil { + return + } + + defer os.Remove(t.Name()) + defer t.Close() + + err = renameDockerImageTarStream(imageName, r, t) + if err != nil { + return + } + + if _, err = t.Seek(0, io.SeekStart); err != nil { + return + } + + d.Client.LoadImage(docker.LoadImageOptions{ + Context: d.Context, + InputStream: t, + OutputStream: d.LivelogWriter, + }) + + img, err = d.Client.InspectImage(imageName) + + return +} + +func (d *DockerWorker) LoadIndexedImage(namespace, path string) (*docker.Image, error) { + d.TaskLogger.Printf("Loading image %s:%s", namespace, path) + + index := tcindex.NewFromEnv() + + task, err := index.FindTask(namespace) + if err != nil { + return nil, err + } + + return d.LoadArtifactImage(task.TaskID, "", path) +} + +func (d *DockerWorker) LoadImage(name string) (*docker.Image, error) { + d.TaskLogger.Printf("Loading image %s", name) + return EnsureImage(d.Context, d.Client, name, d.LivelogWriter) +} diff --git a/dockerworker/dockerimage_test.go b/dockerworker/dockerimage_test.go new file mode 100644 index 00000000..1e5ebb9a --- /dev/null +++ b/dockerworker/dockerimage_test.go @@ -0,0 +1,62 @@ +// +build docker + +package dockerworker + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +const ( + imageTaskID = "Xx0aPfyOTU2o_0FZnr_AJg" + testNamespace = "garbage.docker-worker-tests.docker-images" + exampleImage = "coreos/example:1.0.0" +) + +func loadArtifactImageTest(t *testing.T, artifactName string) { + d := NewTestDockerWorker(t) + + imageName := MakeImageName(imageTaskID, "", artifactName) + + img, err := d.LoadArtifactImage(imageTaskID, "", artifactName) + require.NoError(t, err) + defer d.Client.RemoveImage(img.ID) + + loadedImage, err := d.Client.InspectImage(imageName) + require.NoError(t, err) + require.Equal(t, loadedImage.ID, img.ID) +} + +func TestLoadArtifactImageTar(t *testing.T) { + loadArtifactImageTest(t, "public/image.tar") +} + +func TestLoadArtifactImageZst(t *testing.T) { + loadArtifactImageTest(t, "public/image.tar.zst") +} + +func TestLoadArtifactImageLz4(t *testing.T) { + loadArtifactImageTest(t, "public/image.tar.lz4") +} + +func TestLoadIndexedImage(t *testing.T) { + d := NewTestDockerWorker(t) + + img, err := d.LoadIndexedImage(testNamespace, "public/image.tar") + require.NoError(t, err) + + require.NoError(t, d.Client.RemoveImage(img.ID)) +} + +func TestLoadImage(t *testing.T) { + d := NewTestDockerWorker(t) + + img, err := d.LoadImage(exampleImage) + require.NoError(t, err) + defer d.Client.RemoveImage(img.ID) + + img2, err := d.Client.InspectImage(exampleImage) + require.NoError(t, err) + require.Equal(t, img.ID, img2.ID) +} diff --git a/dockerworker/dockerworker.go b/dockerworker/dockerworker.go new file mode 100644 index 00000000..fb2017dd --- /dev/null +++ b/dockerworker/dockerworker.go @@ -0,0 +1,70 @@ +// +build docker + +package dockerworker + +import ( + "context" + "encoding/json" + "io" + "log" + + docker "github.com/fsouza/go-dockerclient" + "github.com/taskcluster/taskcluster-client-go/tcqueue" + "github.com/xeipuuv/gojsonschema" +) + +type DockerWorker struct { + LivelogWriter io.Writer + Logger *log.Logger + TaskLogger *log.Logger + Context context.Context + queue *tcqueue.Queue + Client *docker.Client +} + +func New(ctx context.Context, queue *tcqueue.Queue, cli *docker.Client, taskID string, liveLogWriter io.Writer) *DockerWorker { + return &DockerWorker{ + LivelogWriter: liveLogWriter, + Logger: NewLogger(taskID), + TaskLogger: NewTaskLogger(liveLogWriter), + Context: ctx, + queue: queue, + Client: cli, + } +} + +// ValidatePayload validates the docker worker task payload +func ValidatePayload(payload json.RawMessage) (result *gojsonschema.Result, err error) { + const schema = "https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json" + + schemaLoader := gojsonschema.NewReferenceLoader(schema) + docLoader := gojsonschema.NewStringLoader(string(payload)) + return gojsonschema.Validate(schemaLoader, docLoader) +} + +// CreateContainer creates a new docker container to run a task +func (d *DockerWorker) CreateContainer(env []string, image *docker.Image, command []string, privileged bool) (container *docker.Container, err error) { + return d.Client.CreateContainer(docker.CreateContainerOptions{ + Config: &docker.Config{ + Image: image.ID, + Cmd: command, + Hostname: "", + User: "", + AttachStdin: false, + AttachStdout: true, + AttachStderr: true, + Tty: true, + OpenStdin: false, + StdinOnce: false, + Env: env, + }, + HostConfig: &docker.HostConfig{ + Privileged: privileged, + ShmSize: 1800000000, + ExtraHosts: []string{ + "localhost.localdomain:127.0.0.1", // Bug 1488148 + }, + }, + Context: d.Context, + }) +} diff --git a/dockerworker/log.go b/dockerworker/log.go new file mode 100644 index 00000000..f7a5bd9c --- /dev/null +++ b/dockerworker/log.go @@ -0,0 +1,20 @@ +// +build docker + +package dockerworker + +import ( + "fmt" + "io" + "log" + "os" +) + +// NewLogger returns a new Logger that will write to standard output +func NewLogger(taskID string) *log.Logger { + return log.New(os.Stdout, fmt.Sprintf("[generic-docker-worker taskID=\"%s\"] ", taskID), 0) +} + +// NewTaskLogger returns a logger that will writer to the task log +func NewTaskLogger(writer io.Writer) *log.Logger { + return log.New(writer, "", 0) +} diff --git a/dockerworker/renameimage.go b/dockerworker/renameimage.go new file mode 100644 index 00000000..c4877ebd --- /dev/null +++ b/dockerworker/renameimage.go @@ -0,0 +1,155 @@ +// +build docker + +package dockerworker + +import ( + "archive/tar" + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "regexp" + + "github.com/pkg/errors" +) + +var allowedDockerLayerIDPattern = regexp.MustCompile(`^[a-fA-f0-9]{64}$`) + +func isAllowedDockerLayerFile(name string) bool { + if len(name) < 64 || !allowedDockerLayerIDPattern.MatchString(name[0:64]) { + return false + } + switch name[64:] { + case ".json", "/", "/json", "/layer.tar", "/VERSION": + return true + default: + return false + } +} + +// renameDockerImageTarStream renames a tar-stream containing a docker image +// in the `docker save` format. The tar-stream is renamed on-the-fly without +// any files being written to disk. +func renameDockerImageTarStream(imageName string, r io.Reader, w io.Writer) error { + // We call rewriteTarStream to rename the tar-stream on-the-fly + nameWritten := false // check that we found 'repositories' or 'manifest.json' + rwerr := rewriteTarStream(r, w, func(hdr *tar.Header, r io.Reader) (*tar.Header, io.Reader, error) { + // Whenever an entry is reached we switch on the name + switch hdr.Name { + case "repositories", "manifest.json": + nameWritten = true + data, err := ioutil.ReadAll(r) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to read the metadata file from docker image tar-stream") + } + switch hdr.Name { + case "repositories": + data, err = rewriteRepositories(imageName, data) + case "manifest.json": + data, err = rewriteManifest(imageName, data) + default: + panic(errors.New("unreachable")) + } + if err != nil { + return nil, nil, err + } + hdr.Size = int64(len(data)) + return hdr, bytes.NewReader(data), nil + default: + if !isAllowedDockerLayerFile(hdr.Name) { + return nil, nil, fmt.Errorf( + "docker image tar-ball contains illegal file: '%s', consider using an older version of docker", hdr.Name, + ) + } + return hdr, r, nil + } + }) + // If there was no error and name wasn't written at some point, then we have problem + if rwerr == nil && !nameWritten { + return fmt.Errorf( + "docker image tar-ball did contain 'manifest.json' or 'repositories', docker image is invalid", + ) + } + return rwerr +} + +// rewriteRepositories will rewrite the 'repositories' file from a docker image +// tar-stream such that the image has a single tag named :latest +func rewriteRepositories(imageName string, data []byte) ([]byte, error) { + // Rewrite the 'repositories' by reading it as JSON from the input stream + var repositories map[string]map[string]string + err := json.Unmarshal(data, &repositories) + if err != nil { + return nil, fmt.Errorf( + "the 'repositories' file in the docker image tar-ball is not valid JSON, error: %s", + err.Error(), + ) + } + // Find the layerID + var layerID string + for _, tags := range repositories { + for _, ID := range tags { + // if there are multiple tags pointing to different IDs that's problematic + if layerID == "" { + layerID = ID + } else if layerID != ID { + return nil, fmt.Errorf( + "the 'repositories' file in the docker image tar-ball contains multiple tags with different layer IDs", + ) + } + } + } + // If there is no imageID we have a problem + if layerID == "" { + return nil, fmt.Errorf( + "the 'repositories' file in the docker image tar-ball did not contain any layer identifiers, image is invalid", + ) + } + // Create result of the rewrite and return it + data, err = json.Marshal(map[string]map[string]string{ + imageName: {"latest": layerID}, // map from imageName to layerID + }) + if err != nil { + panic(errors.Wrap(err, "json.Marshal failed on map[string]map[string]string that can't happen")) + } + return data, nil +} + +// Type to read from manifest.json +type manifestEntry struct { + Config string `json:"Config,omitempty"` + RepoTags []string `json:"RepoTags"` + Layers []string `json:"Layers"` +} + +// rewriteManifest will rewrite the 'manifest.json' file from a docker image +// tar-stream such that the image has a single tag named :latest +func rewriteManifest(imageName string, data []byte) ([]byte, error) { + // Read the manifest.json from input + var manifest []manifestEntry + err := json.Unmarshal(data, &manifest) + if err != nil { + return nil, fmt.Errorf( + "the 'manifest.json' file in the docker image tar-ball is not valid JSON, error: %s", + err.Error(), + ) + } + if len(manifest) == 0 { + return nil, fmt.Errorf( + "the 'manifest.json' file in the docker image tar-ball did not contain any entries", + ) + } + if len(manifest) > 1 { + return nil, fmt.Errorf( + "the 'manifest.json' file in the docker image tar-ball contains more than one entry", + ) + } + // Rewrite the RepoTags and only the first entry as JSON for result + manifest[0].RepoTags = []string{fmt.Sprintf("%s:latest", imageName)} + data, err = json.Marshal([]manifestEntry{manifest[0]}) + if err != nil { + panic(errors.Wrap(err, "json.Marshal failed on []manifestEntry that can't happen")) + } + return data, nil +} diff --git a/dockerworker/renameimage_test.go b/dockerworker/renameimage_test.go new file mode 100644 index 00000000..90dec38e --- /dev/null +++ b/dockerworker/renameimage_test.go @@ -0,0 +1,33 @@ +// +build docker + +package dockerworker + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRewriteRepositories(t *testing.T) { + data, err := rewriteRepositories("my-image", []byte( + `{"busybox":{"latest":"374004614a75c2c4afd41a3050b5217e282155eb1eb7b4ce8f22aa9f4b17ee57"}}`, + )) + require.NoError(t, err) + require.Contains(t, string(data), "my-image") + require.NotContains(t, string(data), "busybox") +} + +func TestRewriteManifest(t *testing.T) { + data, err := rewriteManifest("my-image", []byte(`[ + { + "Config": "2b8fd9751c4c0f5dd266fcae00707e67a2545ef34f9a29354585f93dac906749.json", + "RepoTags": ["busybox:latest"], + "Layers": [ + "374004614a75c2c4afd41a3050b5217e282155eb1eb7b4ce8f22aa9f4b17ee57/layer.tar" + ] + } + ]`)) + require.NoError(t, err) + require.Contains(t, string(data), "my-image") + require.NotContains(t, string(data), "busybox") +} diff --git a/dockerworker/tarstream.go b/dockerworker/tarstream.go new file mode 100644 index 00000000..a24f3c72 --- /dev/null +++ b/dockerworker/tarstream.go @@ -0,0 +1,67 @@ +// +build docker + +package dockerworker + +import ( + "archive/tar" + "io" + + "github.com/pkg/errors" +) + +// a tarRewriter is a function that given a tar.Header and io.Reader returns a +// tar.Header and io.Reader to be written in place of the entry given. +// +// If a nil header is returned the entry is skipped, if no transformation is +// desired the header and reader given can be returned with any changes. +type tarRewriter func(header *tar.Header, r io.Reader) (*tar.Header, io.Reader, error) + +// rewriteTarStream rewrites a tar-stream read from r and written to w. +// +// For each entry in the tar-stream the rewriter function is called, if it +// returns an error then the process is aborted, if it returns a nil header the +// entry is skipped, otherwise the returned header and body is rewritten to the +// output tar-stream. Notice that rewriter can return the arguments given +// in-order to let entries pass-through. +func rewriteTarStream(r io.Reader, w io.Writer, rewriter tarRewriter) error { + // Create a tar.Reader and tar.Writer, so we can move files between the two + tr := tar.NewReader(r) + tw := tar.NewWriter(w) + for { + // Read a tar.Header + hdr, err := tr.Next() + if err == io.EOF { + break // we're done reading + } + if err != nil { + return errors.Wrap(err, "failed to read tar.Reader while rewriting tar-stream") + } + + // Allow the rewriter function to rewrite the entry + var hdr2 *tar.Header + var body io.Reader + hdr2, body, err = rewriter(hdr, tr) + if err != nil { + return err + } + if hdr2 == nil { + continue // skip this entry + } + + // Write tar.Header and copy the body without making any changes + err = tw.WriteHeader(hdr2) + if err != nil { + return errors.Wrap(err, "failed to write a tar.Header, while rewriting tar-stream") + } + // Copy file body to target as well + _, err = io.Copy(tw, body) + if err != nil { + return errors.Wrap(err, "failed to write to tar.Writer, while rewriting tar-stream") + } + } + // Close the tar.Writer, this ensure any cached bytes or outstanding errors are raised + if err := tw.Close(); err != nil { + return errors.Wrap(err, "failed to close tar.Writer") + } + return nil +} diff --git a/dockerworker/tarstream_test.go b/dockerworker/tarstream_test.go new file mode 100644 index 00000000..3d9a1644 --- /dev/null +++ b/dockerworker/tarstream_test.go @@ -0,0 +1,98 @@ +// +build docker + +package dockerworker + +import ( + "archive/tar" + "bytes" + "crypto/rand" + "errors" + "fmt" + "io" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRewriteTarStream(t *testing.T) { + // Create a big random blob + blob := make([]byte, 6*1024*1024) + _, err := rand.Read(blob) + require.NoError(t, err) + + // Create tar file + var input = []struct { + Name string + Data []byte + }{ + {"hello.txt", []byte("hello-world")}, + {"rewritten.txt", []byte("data-to-be-rewritten")}, + {"filtered.txt", blob}, + {"to-rename.txt", []byte("data-in-renamed-file")}, + {"blob.bin", blob}, + } + b := bytes.NewBuffer(nil) + tw := tar.NewWriter(b) + for _, f := range input { + err = tw.WriteHeader(&tar.Header{ + Name: f.Name, + Mode: 0600, + Size: int64(len(f.Data)), + }) + require.NoError(t, err) + _, err = tw.Write(f.Data) + require.NoError(t, err) + } + + // Rewrite the tar file as stream + out := bytes.NewBuffer(nil) + err = rewriteTarStream(b, out, func(hdr *tar.Header, r io.Reader) (*tar.Header, io.Reader, error) { + switch hdr.Name { + case "hello.txt": + return hdr, r, nil + case "rewritten.txt": + data := []byte("data-that-was-rewritten") + hdr.Size = int64(len(data)) + return hdr, bytes.NewReader(data), nil + case "filtered.txt": + return nil, nil, nil + case "to-rename.txt": + hdr.Name = "was-renamed.txt" + return hdr, r, nil + case "blob.bin": + return hdr, r, nil + default: + return nil, nil, errors.New("unhandled file name in test case") + } + }) + require.NoError(t, err) + + // Read the rewritten tar-stream + tr := tar.NewReader(out) + + // declare expected output + var output = []struct { + Name string + Data string + }{ + {"hello.txt", "hello-world"}, + {"rewritten.txt", "data-that-was-rewritten"}, + {"was-renamed.txt", "data-in-renamed-file"}, + {"blob.bin", string(blob)}, + } + for _, f := range output { + fmt.Printf(" - verify %s\n", f.Name) + var hdr *tar.Header + hdr, err = tr.Next() + require.NoError(t, err) + require.Equal(t, f.Name, hdr.Name) + var data []byte + data, err = ioutil.ReadAll(tr) + require.NoError(t, err) + require.Equal(t, f.Data, string(data)) + } + fmt.Println(" - verify that we have EOF") + _, err = tr.Next() + require.Equal(t, io.EOF, err) +} diff --git a/dockerworker/util.go b/dockerworker/util.go new file mode 100644 index 00000000..c4ae56c7 --- /dev/null +++ b/dockerworker/util.go @@ -0,0 +1,20 @@ +// +build docker + +package dockerworker + +import ( + "context" + "os" + "testing" + + docker "github.com/fsouza/go-dockerclient" + "github.com/stretchr/testify/require" + "github.com/taskcluster/taskcluster-client-go/tcqueue" +) + +// NewTestDockerWorker returns a new DockerWorker object for tests +func NewTestDockerWorker(t *testing.T) *DockerWorker { + cli, err := docker.NewClientFromEnv() + require.NoError(t, err) + return New(context.Background(), tcqueue.NewFromEnv(), cli, "testtaskid", os.Stdout) +} diff --git a/dockerworker/util_test.go b/dockerworker/util_test.go new file mode 100644 index 00000000..61c6d160 --- /dev/null +++ b/dockerworker/util_test.go @@ -0,0 +1,58 @@ +// +build docker + +package dockerworker + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/taskcluster/slugid-go/slugid" + tcclient "github.com/taskcluster/taskcluster-client-go" + "github.com/taskcluster/taskcluster-client-go/tcqueue" +) + +func createDummyArtifact(d *DockerWorker, name, content string) (taskID string, err error) { + task := tcqueue.TaskDefinitionRequest{ + Created: tcclient.Time(time.Now()), + Deadline: tcclient.Time(time.Now().Add(10 * time.Minute)), + Expires: tcclient.Time(time.Now().Add(48 * time.Hour)), + Payload: json.RawMessage("{}"), + Metadata: tcqueue.TaskMetadata{ + Description: "generic-worker dummy task", + Name: "generic-worker dummy task", + Owner: "wcosta@mozilla.com", + Source: "https://www.mozilla.org", + }, + Priority: "lowest", + ProvisionerID: "null-provisioner", + WorkerType: "docker-worker", + SchedulerID: "docker-worker-tests", + } + + taskID = slugid.Nice() + + if _, err = d.queue.CreateTask(taskID, &task); err != nil { + return + } + + fmt.Printf("Created task %s\n", taskID) + + claimResp, err := d.queue.ClaimTask(taskID, "0", &tcqueue.TaskClaimRequest{ + WorkerGroup: "docker-worker", + WorkerID: "docker-worker", + }) + + if err != nil { + return + } + + // We cannot call report completed without claiming the task first + defer d.queue.ReportCompleted(taskID, "0") + + d.ScheduleReclaim(claimResp) + + err = d.UploadArtifact(taskID, "0", name, strings.NewReader(content)) + return +} diff --git a/imageid_test.go b/imageid_test.go new file mode 100644 index 00000000..3aa35731 --- /dev/null +++ b/imageid_test.go @@ -0,0 +1,9 @@ +package main + +import "github.com/taskcluster/slugid-go/slugid" + +var ( + // all tests can share taskGroupId so we can view all test tasks in same + // graph later for troubleshooting + taskGroupID = slugid.Nice() +) diff --git a/intermittent_task_test.go b/intermittent_task_test.go index 4209db9b..d9af9f78 100644 --- a/intermittent_task_test.go +++ b/intermittent_task_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/main.go b/main.go index 0f5fe4fa..75f4f872 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,6 @@ //go:generate gw-codegen file://all-unix-style.yml generated_all-unix-style.go !windows //go:generate gw-codegen file://windows.yml generated_windows.go -//go:generate gw-codegen https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/payload.json dockerworker/payload.go +//go:generate gw-codegen https://raw.githubusercontent.com/taskcluster/docker-worker/master/schemas/v1/payload.json dockerworker/payload.go package main @@ -28,7 +28,6 @@ import ( "github.com/taskcluster/taskcluster-client-go/tcawsprovisioner" "github.com/taskcluster/taskcluster-client-go/tcpurgecache" "github.com/taskcluster/taskcluster-client-go/tcqueue" - "github.com/xeipuuv/gojsonschema" ) var ( @@ -809,75 +808,6 @@ func ClaimWork() *TaskRun { } } -func (task *TaskRun) validatePayload() *CommandExecutionError { - jsonPayload := task.Definition.Payload - log.Printf("JSON payload: %s", jsonPayload) - schemaLoader := gojsonschema.NewStringLoader(taskPayloadSchema()) - docLoader := gojsonschema.NewStringLoader(string(jsonPayload)) - result, err := gojsonschema.Validate(schemaLoader, docLoader) - if err != nil { - return MalformedPayloadError(err) - } - if !result.Valid() { - task.Error("TASK FAIL since the task payload is invalid. See errors:") - for _, desc := range result.Errors() { - task.Errorf("- %s", desc) - } - // Dealing with Invalid Task Payloads - // ---------------------------------- - // If the task payload is malformed or invalid, keep in mind that the - // queue doesn't validate the contents of the `task.payload` property, - // the worker may resolve the current run by reporting an exception. - // When reporting an exception, using `tcqueue.ReportException` the - // worker should give a `reason`. If the worker is unable execute the - // task specific payload/code/logic, it should report exception with - // the reason `malformed-payload`. - // - // This can also be used if an external resource that is referenced in - // a declarative nature doesn't exist. Generally, it should be used if - // we can be certain that another run of the task will have the same - // result. This differs from `tcqueue.ReportFailed` in the sense that we - // report a failure if the task specific code failed. - // - // Most tasks includes a lot of declarative steps, such as poll a - // docker image, create cache folder, decrypt encrypted environment - // variables, set environment variables and etc. Clearly, if decryption - // of environment variables fail, there is no reason to retry the task. - // Nor can it be said that the task failed, because the error wasn't - // cause by execution of Turing complete code. - // - // If however, we run some executable code referenced in `task.payload` - // and the code crashes or exists non-zero, then the task is said to be - // failed. The difference is whether or not the unexpected behavior - // happened before or after the execution of task specific Turing - // complete code. - return MalformedPayloadError(fmt.Errorf("Validation of payload failed for task %v", task.TaskID)) - } - err = json.Unmarshal(jsonPayload, &task.Payload) - if err != nil { - return MalformedPayloadError(err) - } - for _, artifact := range task.Payload.Artifacts { - // The default artifact expiry is task expiry, but is only applied when - // the task artifacts are resolved. We intentionally don't modify - // task.Payload otherwise it no longer reflects the real data defined - // in the task. - if !time.Time(artifact.Expires).IsZero() { - // Don't be too strict: allow 1s discrepancy to account for - // possible timestamp rounding on upstream systems - if time.Time(artifact.Expires).Add(time.Second).Before(time.Time(task.Definition.Deadline)) { - return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires before task deadline (%v is before %v)", artifact.Path, artifact.Expires, task.Definition.Deadline)) - } - // Don't be too strict: allow 1s discrepancy to account for - // possible timestamp rounding on upstream systems - if time.Time(artifact.Expires).After(time.Time(task.Definition.Expires).Add(time.Second)) { - return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires after task expiry (%v is after %v)", artifact.Path, artifact.Expires, task.Definition.Expires)) - } - } - } - return nil -} - type CommandExecutionError struct { TaskStatus TaskStatus Cause error diff --git a/main_test.go b/main_test.go index b512c001..6b6fa421 100644 --- a/main_test.go +++ b/main_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/mounts_test.go b/mounts_test.go index d56c968d..fb57eb76 100644 --- a/mounts_test.go +++ b/mounts_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/os_groups_all-unix-style_test.go b/os_groups_all-unix-style_test.go index f58db001..5d8f9578 100644 --- a/os_groups_all-unix-style_test.go +++ b/os_groups_all-unix-style_test.go @@ -1,4 +1,4 @@ -// +build !windows +// +build !windows,!docker package main diff --git a/os_groups_windows_test.go b/os_groups_windows_test.go index 99da5b2c..b41da91d 100644 --- a/os_groups_windows_test.go +++ b/os_groups_windows_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/payload_test.go b/payload_test.go index bf56cea9..bb58e744 100644 --- a/payload_test.go +++ b/payload_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/plat_all-unix-style.go b/plat_all-unix-style.go index be0abc97..7215bf4f 100644 --- a/plat_all-unix-style.go +++ b/plat_all-unix-style.go @@ -1,4 +1,4 @@ -// +build !windows +// +build !windows,!docker package main diff --git a/plat_darwin.go b/plat_darwin.go index c76555e9..0d9db925 100644 --- a/plat_darwin.go +++ b/plat_darwin.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/plat_docker.go b/plat_docker.go new file mode 100644 index 00000000..4a670017 --- /dev/null +++ b/plat_docker.go @@ -0,0 +1,153 @@ +// +build docker,!windows + +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "runtime" + "strconv" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/taskcluster/generic-worker/dockerworker" + "github.com/taskcluster/generic-worker/process" + "github.com/taskcluster/shell" +) + +var cli *docker.Client + +func (task *TaskRun) EnvVars() []string { + env := make([]string, 0, len(task.Payload.Env)) + + for key, value := range task.Payload.Env { + env = append(env, fmt.Sprintf("%s=%s", key, value)) + } + + return env +} + +func platformFeatures() []Feature { + return []Feature{} +} + +type PlatformData struct { + Image json.RawMessage +} + +type TaskContext struct { + TaskDir string +} + +func (task *TaskRun) NewPlatformData() (pd *PlatformData, err error) { + return &PlatformData{}, nil +} + +func (pd *PlatformData) ReleaseResources() error { + return nil +} + +func immediateShutdown(cause string) { +} + +func immediateReboot() { +} + +func init() { + var err error + if cli, err = docker.NewClientFromEnv(); err != nil { + panic(err) + } + + pwd, err := os.Getwd() + if err != nil { + panic(err) + } + + taskContext = &TaskContext{ + TaskDir: pwd, + } +} + +func (task *TaskRun) prepareCommand(index int) error { + return nil +} + +func (task *TaskRun) generateCommand(index int) error { + var err error + task.Commands[index], err = process.NewCommand( + dockerworker.New(context.Background(), task.Queue, cli, task.TaskID, task.logWriter), + task.Payload.Command[index], + task.PlatformData.Image, + taskContext.TaskDir, + task.EnvVars(), + ) + return err +} + +func purgeOldTasks() error { + return nil +} + +func install(arguments map[string]interface{}) (err error) { + return nil +} + +func makeDirReadableForTaskUser(task *TaskRun, dir string) error { + // No user separation yet + return nil +} + +func makeDirUnreadableForTaskUser(task *TaskRun, dir string) error { + // No user separation yet + return nil +} + +func RenameCrossDevice(oldpath, newpath string) error { + return nil +} + +func (task *TaskRun) formatCommand(index int) string { + return shell.Escape(task.Payload.Command[index]...) +} + +func prepareTaskUser(username string) bool { + return false +} + +func deleteTaskDir(path string) error { + return nil +} + +func defaultTasksDir() string { + // assume all user home directories are all in same folder, i.e. the parent + // folder of the current user's home folder + return filepath.Dir(os.Getenv("HOME")) +} + +// N/A for unix - just a windows thing +func AutoLogonCredentials() (string, string) { + return "", "" +} + +func chooseTaskDirName() string { + return "task_" + strconv.Itoa(int(time.Now().Unix())) +} + +func unsetAutoLogon() { +} + +func deleteTaskDirs() { + //removeTaskDirs(config.TasksDir) +} + +func GrantSIDFullControlOfInteractiveWindowsStationAndDesktop(sid string) (err error) { + return fmt.Errorf( + "Cannot grant %v full control of interactive windows station and desktop; platform %v does not have such entities", + sid, + runtime.GOOS, + ) +} diff --git a/plat_windows.go b/plat_windows.go index 25839b70..7c4dde05 100644 --- a/plat_windows.go +++ b/plat_windows.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/process/logininfo_windows.go b/process/logininfo_windows.go index f70464b0..0336dd8d 100644 --- a/process/logininfo_windows.go +++ b/process/logininfo_windows.go @@ -1,3 +1,5 @@ +// +build !docker + package process import ( diff --git a/process/process_darwin.go b/process/process_all-unix-style.go similarity index 99% rename from process/process_darwin.go rename to process/process_all-unix-style.go index 3e55e90c..d4ab882a 100644 --- a/process/process_darwin.go +++ b/process/process_all-unix-style.go @@ -1,3 +1,5 @@ +// +build !windows,!docker + package process import ( diff --git a/process/process_docker.go b/process/process_docker.go new file mode 100644 index 00000000..d99a7921 --- /dev/null +++ b/process/process_docker.go @@ -0,0 +1,141 @@ +// +build docker,!windows + +package process + +import ( + "encoding/json" + "fmt" + "io" + "os" + "sync" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/taskcluster/generic-worker/dockerworker" +) + +type Result struct { + SystemError error + exitCode int + Duration time.Duration +} + +type Command struct { + mutex sync.RWMutex + worker *dockerworker.DockerWorker + writer io.Writer + cmd []string + workingDirectory string + image json.RawMessage + env []string +} + +func (c *Command) ensureImage() (img *docker.Image, err error) { + var imageName string + var imageArtifact dockerworker.DockerImageArtifact + + if err = json.Unmarshal(c.image, &imageName); err == nil { + img, err = c.worker.LoadImage(imageName) + } else if err = json.Unmarshal(c.image, &imageArtifact); err == nil { + img, err = c.worker.LoadArtifactImage(imageArtifact.TaskID, "", imageArtifact.Path) + } + + return +} + +func (c *Command) DirectOutput(writer io.Writer) { + c.writer = writer +} + +func (c *Command) String() string { + return fmt.Sprintf("%q", c.cmd) +} + +func (c *Command) Execute() (r *Result) { + r = &Result{} + + c.mutex.Lock() + defer c.mutex.Unlock() + + image, err := c.ensureImage() + if err != nil { + r.SystemError = fmt.Errorf("Error downloading image: %v", err) + return + } + + container, err := c.worker.CreateContainer(c.env, image, c.cmd, false) + if err != nil { + r.SystemError = fmt.Errorf("Error creating a new container: %v", err) + return + } + + if err = c.worker.Client.StartContainerWithContext(container.ID, nil, c.worker.Context); err != nil { + r.SystemError = fmt.Errorf("Error starting container: %v", err) + return + } + + started := time.Now() + if r.exitCode, err = c.worker.Client.WaitContainer(container.ID); err != nil { + r.SystemError = fmt.Errorf("Error wating for container to finish: %v", err) + return + } + + err = c.worker.Client.Logs(docker.LogsOptions{ + Context: c.worker.Context, + Container: container.ID, + OutputStream: c.worker.LivelogWriter, + ErrorStream: c.worker.LivelogWriter, + Stdout: true, + Stderr: true, + RawTerminal: true, + Timestamps: true, + }) + if err != nil { + r.SystemError = fmt.Errorf("Error pulling container logs: %v", err) + } + + finished := time.Now() + r.Duration = finished.Sub(started) + + return +} + +func (r *Result) CrashCause() error { + return r.SystemError +} + +func (r *Result) Crashed() bool { + return r.SystemError != nil +} + +func (r *Result) FailureCause() error { + return fmt.Errorf("Exit code %v", r.exitCode) +} + +func (r *Result) Failed() bool { + return r.exitCode != 0 +} + +func (r *Result) ExitCode() int { + if r.SystemError != nil { + return -2 + } + + return r.exitCode +} + +func NewCommand(worker *dockerworker.DockerWorker, commandLine []string, image json.RawMessage, workingDirectory string, env []string) (*Command, error) { + c := &Command{ + worker: worker, + writer: os.Stdout, + cmd: commandLine, + workingDirectory: workingDirectory, + image: image, + env: env, + } + return c, nil +} + +func (c *Command) Kill() ([]byte, error) { + return []byte{}, nil +} diff --git a/process/process_docker_test.go b/process/process_docker_test.go new file mode 100644 index 00000000..191510fb --- /dev/null +++ b/process/process_docker_test.go @@ -0,0 +1,52 @@ +// +build docker + +package process + +import ( + "encoding/json" + "fmt" + "io" + "strings" + "testing" + + docker "github.com/fsouza/go-dockerclient" + "github.com/mattetti/filebuffer" + "github.com/stretchr/testify/require" + "github.com/taskcluster/generic-worker/dockerworker" +) + +func TestProcess(t *testing.T) { + const message = "Would you kindly?" + const imageName = "ubuntu:14.04" + + image, err := json.Marshal(imageName) + require.NoError(t, err) + + d := dockerworker.NewTestDockerWorker(t) + if _, err = d.Client.InspectImage(imageName); err == nil { + require.NoError(t, d.Client.RemoveImageExtended(imageName, docker.RemoveImageOptions{ + Force: true, + Context: d.Context, + })) + } + buff := filebuffer.New([]byte{}) + d.LivelogWriter = buff + + c, err := NewCommand( + d, + []string{"/bin/bash", "-c", "echo $MESSAGE"}, + image, + "/", + []string{fmt.Sprintf("MESSAGE='%s'", message)}, + ) + require.NoError(t, err) + + r := c.Execute() + require.NoError(t, r.SystemError) + require.Equal(t, r.ExitCode(), 0) + + _, err = buff.Seek(0, io.SeekStart) + require.NoError(t, err) + + require.True(t, strings.Contains(buff.String(), message)) +} diff --git a/process/process_linux.go b/process/process_linux.go deleted file mode 100644 index abb7aa3f..00000000 --- a/process/process_linux.go +++ /dev/null @@ -1,139 +0,0 @@ -package process - -import ( - "fmt" - "io" - "os" - "sync" - "time" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/client" - "golang.org/x/net/context" -) - -type Result struct { - SystemError error - ExitCode int64 - Duration time.Duration -} - -type Command struct { - mutex sync.RWMutex - ctx context.Context - cli *client.Client - resp container.ContainerCreateCreatedBody - writer io.Writer - cmd []string - workingDirectory string - env []string -} - -var cli *client.Client - -func init() { - var err error - cli, err = client.NewClientWithOpts(client.WithVersion("1.24")) - if err != nil { - panic(err) - } -} - -func (c *Command) DirectOutput(writer io.Writer) { - c.writer = writer -} - -func (c *Command) String() string { - return fmt.Sprintf("%q", c.cmd) -} - -func (c *Command) Execute() (r *Result) { - - var outPull io.ReadCloser - r = &Result{} - c.mutex.Lock() - defer c.mutex.Unlock() - outPull, r.SystemError = cli.ImagePull(c.ctx, "ubuntu", types.ImagePullOptions{}) - if r.SystemError != nil { - return - } - defer outPull.Close() - io.Copy(c.writer, outPull) - - c.resp, r.SystemError = c.cli.ContainerCreate( - c.ctx, - &container.Config{ - Image: "ubuntu", - Cmd: c.cmd, - WorkingDir: c.workingDirectory, - Env: c.env, - }, - nil, - nil, - "", - ) - if r.SystemError != nil { - return - } - - r.SystemError = c.cli.ContainerStart(c.ctx, c.resp.ID, types.ContainerStartOptions{}) - if r.SystemError != nil { - return - } - - started := time.Now() - res, errch := c.cli.ContainerWait(c.ctx, c.resp.ID, container.WaitConditionNotRunning) - select { - case r.SystemError = <-errch: - if r.SystemError != nil { - return - } - case exitCode := <-res: - r.ExitCode = exitCode.StatusCode - } - - var outLogs io.ReadCloser - outLogs, r.SystemError = c.cli.ContainerLogs(c.ctx, c.resp.ID, types.ContainerLogsOptions{ShowStdout: true}) - if r.SystemError != nil { - return - } - defer outLogs.Close() - io.Copy(c.writer, outLogs) - - finished := time.Now() - r.Duration = finished.Sub(started) - return -} - -func (r *Result) CrashCause() error { - return r.SystemError -} - -func (r *Result) Crashed() bool { - return r.SystemError != nil -} - -func (r *Result) FailureCause() error { - return fmt.Errorf("Exit code %v", r.ExitCode) -} - -func (r *Result) Failed() bool { - return r.ExitCode != 0 -} - -func NewCommand(commandLine []string, workingDirectory string, env []string) (*Command, error) { - c := &Command{ - ctx: context.Background(), - writer: os.Stdout, - cmd: commandLine, - workingDirectory: workingDirectory, - cli: cli, - env: env, - } - return c, nil -} - -func (c *Command) Kill() error { - return nil -} diff --git a/process/process_windows.go b/process/process_windows.go index ea9a52dc..b184969e 100644 --- a/process/process_windows.go +++ b/process/process_windows.go @@ -1,3 +1,5 @@ +// +build !docker + package process import ( diff --git a/supersede_test.go b/supersede_test.go index 4f6b9eba..e6ce2765 100644 --- a/supersede_test.go +++ b/supersede_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/taskcluster_proxy_test.go b/taskcluster_proxy_test.go index 71b533ab..4b749abd 100644 --- a/taskcluster_proxy_test.go +++ b/taskcluster_proxy_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/taskstatus_test.go b/taskstatus_test.go index 5b3225df..4553e021 100644 --- a/taskstatus_test.go +++ b/taskstatus_test.go @@ -1,3 +1,5 @@ +// +build !docker + package main import ( diff --git a/validate.go b/validate.go new file mode 100644 index 00000000..5e325f55 --- /dev/null +++ b/validate.go @@ -0,0 +1,81 @@ +// +build !docker + +package main + +import ( + "encoding/json" + "fmt" + "log" + "time" + + "github.com/xeipuuv/gojsonschema" +) + +func (task *TaskRun) validatePayload() *CommandExecutionError { + jsonPayload := task.Definition.Payload + log.Printf("JSON payload: %s", jsonPayload) + schemaLoader := gojsonschema.NewStringLoader(taskPayloadSchema()) + docLoader := gojsonschema.NewStringLoader(string(jsonPayload)) + result, err := gojsonschema.Validate(schemaLoader, docLoader) + if err != nil { + return MalformedPayloadError(err) + } + if !result.Valid() { + task.Error("TASK FAIL since the task payload is invalid. See errors:") + for _, desc := range result.Errors() { + task.Errorf("- %s", desc) + } + // Dealing with Invalid Task Payloads + // ---------------------------------- + // If the task payload is malformed or invalid, keep in mind that the + // queue doesn't validate the contents of the `task.payload` property, + // the worker may resolve the current run by reporting an exception. + // When reporting an exception, using `tcqueue.ReportException` the + // worker should give a `reason`. If the worker is unable execute the + // task specific payload/code/logic, it should report exception with + // the reason `malformed-payload`. + // + // This can also be used if an external resource that is referenced in + // a declarative nature doesn't exist. Generally, it should be used if + // we can be certain that another run of the task will have the same + // result. This differs from `tcqueue.ReportFailed` in the sense that we + // report a failure if the task specific code failed. + // + // Most tasks includes a lot of declarative steps, such as poll a + // docker image, create cache folder, decrypt encrypted environment + // variables, set environment variables and etc. Clearly, if decryption + // of environment variables fail, there is no reason to retry the task. + // Nor can it be said that the task failed, because the error wasn't + // cause by execution of Turing complete code. + // + // If however, we run some executable code referenced in `task.payload` + // and the code crashes or exists non-zero, then the task is said to be + // failed. The difference is whether or not the unexpected behavior + // happened before or after the execution of task specific Turing + // complete code. + return MalformedPayloadError(fmt.Errorf("Validation of payload failed for task %v", task.TaskID)) + } + err = json.Unmarshal(jsonPayload, &task.Payload) + if err != nil { + return MalformedPayloadError(err) + } + for _, artifact := range task.Payload.Artifacts { + // The default artifact expiry is task expiry, but is only applied when + // the task artifacts are resolved. We intentionally don't modify + // task.Payload otherwise it no longer reflects the real data defined + // in the task. + if !time.Time(artifact.Expires).IsZero() { + // Don't be too strict: allow 1s discrepancy to account for + // possible timestamp rounding on upstream systems + if time.Time(artifact.Expires).Add(time.Second).Before(time.Time(task.Definition.Deadline)) { + return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires before task deadline (%v is before %v)", artifact.Path, artifact.Expires, task.Definition.Deadline)) + } + // Don't be too strict: allow 1s discrepancy to account for + // possible timestamp rounding on upstream systems + if time.Time(artifact.Expires).After(time.Time(task.Definition.Expires).Add(time.Second)) { + return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires after task expiry (%v is after %v)", artifact.Path, artifact.Expires, task.Definition.Expires)) + } + } + } + return nil +} diff --git a/validate_docker.go b/validate_docker.go new file mode 100644 index 00000000..6ba2d294 --- /dev/null +++ b/validate_docker.go @@ -0,0 +1,58 @@ +// +build docker + +package main + +import ( + "encoding/json" + "fmt" + + "github.com/taskcluster/generic-worker/dockerworker" +) + +func dockerWorkerToGenericWorkerPayload(dw *dockerworker.DockerWorkerPayload) *GenericWorkerPayload { + artifacts := make([]Artifact, 0, len(dw.Artifacts)) + for name, artifact := range dw.Artifacts { + artifacts = append(artifacts, Artifact{ + Expires: artifact.Expires, + Name: name, + Path: artifact.Path, + }) + } + + return &GenericWorkerPayload{ + Artifacts: artifacts, + Command: [][]string{dw.Command}, + Env: dw.Env, + MaxRunTime: int64(dw.MaxRunTime), + SupersederURL: dw.SupersederURL, + Features: FeatureFlags{}, + Mounts: []json.RawMessage{}, + OnExitStatus: ExitCodeHandling{}, + OSGroups: []string{}, + } +} + +func (task *TaskRun) validatePayload() *CommandExecutionError { + result, err := dockerworker.ValidatePayload(task.Definition.Payload) + if err != nil { + return MalformedPayloadError(err) + } + + if !result.Valid() { + task.Error("TASK FAIL since the task payload is invalid. See errors:") + for _, desc := range result.Errors() { + task.Errorf("- %s", desc) + } + return MalformedPayloadError(fmt.Errorf("Validation of payload failed for task %v", task.TaskID)) + } + + var payload dockerworker.DockerWorkerPayload + if err = json.Unmarshal(task.Definition.Payload, &payload); err != nil { + return MalformedPayloadError(err) + } + + task.PlatformData.Image = payload.Image + task.Payload = *dockerWorkerToGenericWorkerPayload(&payload) + + return nil +}