From 16f104c01c74fcee73ba126bbf1dc9f5adab9a0b Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 4 Mar 2019 18:24:06 -0500 Subject: [PATCH 1/2] Default `stable` image stream from RELEASE_IMAGE_LATEST Periodic jobs that want to run a PR from a known `RELEASE_IMAGE_LATEST` need access to the `cli` and `installer` images out of the payload. This fixes problems we have testing older installers and cli binaries against newer payloads and fixes the current break in the release controllers. This is also required for properly testing the installer. Use standard oc commands (that are API forward compatible) to extract the `cli` image and the `installer` image from the payload and set them on stable if RELEASE_IMAGE_LATEST is provided as an input (as they are for the release blocking jobs) or on stable-initial if RELEASE_IMAGE_INITIAL is provided. Hide nested container output because it is confusing when leveraging a pod from within an existing step. --- pkg/steps/artifacts.go | 13 +- pkg/steps/pod.go | 29 ++- pkg/steps/release/create_release.go | 282 ++++++++++++++++++++++++---- pkg/steps/template.go | 21 ++- 4 files changed, 288 insertions(+), 57 deletions(-) diff --git a/pkg/steps/artifacts.go b/pkg/steps/artifacts.go index 2a2cc6dd..6108ff63 100644 --- a/pkg/steps/artifacts.go +++ b/pkg/steps/artifacts.go @@ -15,13 +15,14 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/util/sets" + "github.com/golang/glog" coreapi "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/scheme" coreclientset "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" @@ -182,7 +183,7 @@ type PodClient interface { } func copyArtifacts(podClient PodClient, into, ns, name, containerName string, paths []string) error { - log.Printf("Copying artifacts from %s into %s", name, into) + glog.V(4).Infof("Copying artifacts from %s into %s", name, into) var args []string for _, s := range paths { args = append(args, "-C", s, ".") @@ -253,7 +254,10 @@ func copyArtifacts(podClient PodClient, into, ns, name, containerName string, pa size += h.Size } - if size > 0 { + // If we're updating a substantial amount of artifacts, let the user know as a way to + // indicate why the step took a long amount of time. Conversely, if we just got a small + // number of files this is just noise and can be omitted to not distract from other steps. + if size > 1*1000*1000 { log.Printf("Copied %0.2fMi of artifacts from %s to %s", float64(size)/1000000, name, into) } @@ -651,6 +655,9 @@ func gatherContainerLogsOutput(podClient PodClient, artifactDir, namespace, podN if err != nil { return fmt.Errorf("could not list pod: %v", err) } + if len(list.Items) == 0 { + return nil + } pod := &list.Items[0] if pod.Annotations["ci-operator.openshift.io/save-container-logs"] != "true" { diff --git a/pkg/steps/pod.go b/pkg/steps/pod.go index 69e3934a..9b5a2550 100644 --- a/pkg/steps/pod.go +++ b/pkg/steps/pod.go @@ -21,7 +21,15 @@ import ( const testSecretName = "test-secret" const testSecretDefaultPath = "/usr/test-secrets" +// PodStepConfiguration allows other steps to reuse the pod launching and monitoring +// behavior without reimplementing function. It also enforces conventions like naming, +// directory structure, and input image format. More sophisticated reuse of launching +// pods should use RunPod which is more limited. type PodStepConfiguration struct { + // SkipLogs instructs the step to omit informational logs, such as when the pod is + // part of a larger step like release creation where displaying pod specific info + // is confusing to an end user. Failure logs are still printed. + SkipLogs bool As string From api.ImageStreamTagReference Commands string @@ -48,8 +56,9 @@ func (s *podStep) Inputs(ctx context.Context, dry bool) (api.InputDefinition, er } func (s *podStep) Run(ctx context.Context, dry bool) error { - log.Printf("Executing %s %s", s.name, s.config.As) - + if !s.config.SkipLogs { + log.Printf("Executing %s %s", s.name, s.config.As) + } containerResources, err := resourcesFor(s.resources.RequirementsForStep(s.config.As)) if err != nil { return fmt.Errorf("unable to calculate %s pod resources for %s: %s", s.name, s.config.As, err) @@ -107,8 +116,8 @@ func (s *podStep) Run(ctx context.Context, dry bool) error { s.subTests = testCaseNotifier.SubTests(s.Description() + " - ") }() - if err := waitForPodCompletion(s.podClient.Pods(s.jobSpec.Namespace), pod.Name, testCaseNotifier); err != nil { - return fmt.Errorf("test %q failed: %v", pod.Name, err) + if err := waitForPodCompletion(s.podClient.Pods(s.jobSpec.Namespace), pod.Name, testCaseNotifier, s.config.SkipLogs); err != nil { + return fmt.Errorf("%s %q failed: %v", s.name, pod.Name, err) } return nil } @@ -267,3 +276,15 @@ func getSecretVolumeMountFromSecret(secretMountPath string) []coreapi.VolumeMoun }, } } + +// RunPod may be used to run a pod to completion. Provides a simpler interface than +// PodStep and is intended for other steps that may need to run transient actions. +// This pod will not be able to gather artifacts, nor will it report log messages +// unless it fails. +func RunPod(podClient PodClient, pod *coreapi.Pod) error { + pod, err := createOrRestartPod(podClient.Pods(pod.Namespace), pod) + if err != nil { + return err + } + return waitForPodCompletion(podClient.Pods(pod.Namespace), pod.Name, nil, true) +} diff --git a/pkg/steps/release/create_release.go b/pkg/steps/release/create_release.go index e1ea4313..92f6f94c 100644 --- a/pkg/steps/release/create_release.go +++ b/pkg/steps/release/create_release.go @@ -2,28 +2,50 @@ package release import ( "context" + "encoding/json" "fmt" + "io/ioutil" "log" "os" + "path/filepath" "strings" "time" coreapi "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + rbacclientset "k8s.io/client-go/kubernetes/typed/rbac/v1" + "k8s.io/client-go/util/retry" imageapi "github.com/openshift/api/image/v1" imageclientset "github.com/openshift/client-go/image/clientset/versioned/typed/image/v1" - "k8s.io/apimachinery/pkg/api/errors" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" - rbacclientset "k8s.io/client-go/kubernetes/typed/rbac/v1" "github.com/openshift/ci-operator/pkg/api" "github.com/openshift/ci-operator/pkg/steps" ) -// assembleReleaseStep knows how to build an update payload image for -// an OpenShift release by waiting for the full release image set to be -// created, then invoking the admin command for building a new release. +// assembleReleaseStep is responsible for creating release images from +// the stable or stable-initial image streams for use with tests that need +// to install or upgrade a cluster. It uses the `cli` image within the +// image stream to create the image and pushes it to a `release` image stream +// at the `latest` or `initial` tags. As output it provides the environment +// variables RELEASE_IMAGE_(LATEST|INITIAL) which can be used by templates +// that invoke the installer. +// +// Since release images describe a set of images, when a user provides +// RELEASE_IMAGE_INITIAL or RELEASE_IMAGE_LATEST as inputs to the ci-operator +// job we treat those as inputs we must expand into the `stable-initial` or +// `stable` image streams. This is because our test scenarios need access not +// just to the release image, but also to the images in that release image +// like installer, cli, or tests. To make it easy for a CI job to install from +// an older release image, we need to extract the 'installer' image into the +// same location that we would expect if it came from a tag_specification. +// The images inside of a release image override any images built or imported +// into the job, which allows you to have an empty tag_specification and +// inject the images from a known historic release for the purposes of building +// branches of those releases. type assembleReleaseStep struct { config api.ReleaseTagConfiguration latest bool @@ -43,42 +65,12 @@ func (s *assembleReleaseStep) Inputs(ctx context.Context, dry bool) (api.InputDe } func (s *assembleReleaseStep) Run(ctx context.Context, dry bool) error { - // if we receive an input, we tag it in instead of generating it - providedImage := os.Getenv(s.envVar()) - if len(providedImage) > 0 { - log.Printf("Setting release image %s to %s", s.tag(), providedImage) - if _, err := s.imageClient.ImageStreamTags(s.jobSpec.Namespace).Update(&imageapi.ImageStreamTag{ - ObjectMeta: meta.ObjectMeta{ - Name: fmt.Sprintf("release:%s", s.tag()), - }, - Tag: &imageapi.TagReference{ - From: &coreapi.ObjectReference{ - Kind: "DockerImage", - Name: providedImage, - }, - }, - }); err != nil { - return err - } - if err := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { - is, err := s.imageClient.ImageStreams(s.jobSpec.Namespace).Get("release", meta.GetOptions{}) - if err != nil { - return false, err - } - ref, _ := findStatusTag(is, s.tag()) - return ref != nil, nil - }); err != nil { - return fmt.Errorf("unable to import %s release image: %v", s.tag(), err) - } - return nil - } - tag := s.tag() - var streamName string - if s.latest { - streamName = api.StableImageStream - } else { - streamName = fmt.Sprintf("%s-initial", api.StableImageStream) + streamName := s.streamName() + + // if we receive an input, we tag it in instead of generating it + if providedImage := os.Getenv(s.envVar()); len(providedImage) > 0 { + return s.importFromReleaseImage(ctx, dry, providedImage) } stable, err := s.imageClient.ImageStreams(s.jobSpec.Namespace).Get(streamName, meta.GetOptions{}) @@ -145,6 +137,207 @@ oc adm release extract --from=%q --to=/tmp/artifacts/release-payload-%s return step.Run(ctx, dry) } +// importFromReleaseImage uses the provided release image and updates the stable / release streams as +// appropriate with the contents of the payload so that downstream components are using the correct images. +// The most common case is to use the correct installer image, tests, and cli commands. +func (s *assembleReleaseStep) importFromReleaseImage(ctx context.Context, dry bool, providedImage string) error { + tag := s.tag() + streamName := s.streamName() + + if dry { + return nil + } + + start := time.Now() + + // create the stable image stream with lookup policy + _, err := s.imageClient.ImageStreams(s.jobSpec.Namespace).Create(&imageapi.ImageStream{ + ObjectMeta: meta.ObjectMeta{ + Name: streamName, + }, + Spec: imageapi.ImageStreamSpec{ + LookupPolicy: imageapi.ImageLookupPolicy{ + Local: true, + }, + }, + }) + if err != nil && !errors.IsAlreadyExists(err) { + return fmt.Errorf("could not create stable imagestreamtag: %v", err) + } + // tag the release image in and let it import + if _, err := s.imageClient.ImageStreamTags(s.jobSpec.Namespace).Update(&imageapi.ImageStreamTag{ + ObjectMeta: meta.ObjectMeta{ + Name: fmt.Sprintf("release:%s", tag), + }, + Tag: &imageapi.TagReference{ + From: &coreapi.ObjectReference{ + Kind: "DockerImage", + Name: providedImage, + }, + }, + }); err != nil { + return err + } + // wait for the release image to be available + var pullSpec string + if err := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { + is, err := s.imageClient.ImageStreams(s.jobSpec.Namespace).Get("release", meta.GetOptions{}) + if err != nil { + return false, err + } + ref, _ := findStatusTag(is, tag) + if ref == nil { + return false, nil + } + var registry string + if len(is.Status.PublicDockerImageRepository) > 0 { + registry = is.Status.PublicDockerImageRepository + } else if len(is.Status.DockerImageRepository) > 0 { + registry = is.Status.DockerImageRepository + } else { + return false, fmt.Errorf("image stream %s has no accessible image registry value", is.Name) + } + pullSpec = fmt.Sprintf("%s:%s", registry, tag) + return true, nil + }); err != nil { + return fmt.Errorf("unable to import %s release image: %v", tag, err) + } + + // override anything in stable with the contents of the release image + // TODO: should we allow underride for things we built in pipeline? + artifactDir := s.artifactDir + if len(artifactDir) == 0 { + var err error + artifactDir, err = ioutil.TempDir("", "payload-images") + if err != nil { + return fmt.Errorf("unable to create temporary artifact dir for payload extraction") + } + } + + // get the CLI image from the payload (since we need it to run oc adm release extract) + target := fmt.Sprintf("release-images-%s", tag) + targetCLI := fmt.Sprintf("%s-cli", target) + if err := steps.RunPod(s.podClient, &coreapi.Pod{ + ObjectMeta: meta.ObjectMeta{ + Name: targetCLI, + Namespace: s.jobSpec.Namespace, + }, + Spec: coreapi.PodSpec{ + RestartPolicy: coreapi.RestartPolicyNever, + Containers: []coreapi.Container{ + { + Name: "release", + Image: pullSpec, + Command: []string{"/bin/sh", "-c", "cluster-version-operator image cli > /dev/termination-log"}, + }, + }, + }, + }); err != nil { + return fmt.Errorf("unable to find the 'cli' image in the provided release image: %v", err) + } + pod, err := s.podClient.Pods(s.jobSpec.Namespace).Get(targetCLI, meta.GetOptions{}) + if err != nil { + return fmt.Errorf("unable to extract the 'cli' image from the release image: %v", err) + } + if len(pod.Status.ContainerStatuses) == 0 || pod.Status.ContainerStatuses[0].State.Terminated == nil { + return fmt.Errorf("unable to extract the 'cli' image from the release image: %v", err) + } + cliImage := pod.Status.ContainerStatuses[0].State.Terminated.Message + + // tag the cli image into stable so we use the correct pull secrets from the namespace + if _, err := s.imageClient.ImageStreamTags(s.jobSpec.Namespace).Update(&imageapi.ImageStreamTag{ + ObjectMeta: meta.ObjectMeta{ + Name: fmt.Sprintf("%s:cli", streamName), + }, + Tag: &imageapi.TagReference{ + From: &coreapi.ObjectReference{ + Kind: "DockerImage", + Name: cliImage, + }, + }, + }); err != nil { + return err + } + + // run adm release extract and grab the raw image-references from the payload + podConfig := steps.PodStepConfiguration{ + SkipLogs: true, + As: target, + From: api.ImageStreamTagReference{ + Name: api.StableImageStream, + Tag: "cli", + }, + ServiceAccountName: "builder", + ArtifactDir: "/tmp/artifacts", + Commands: fmt.Sprintf(` +set -euo pipefail +export HOME=/tmp +oc registry login +oc adm release extract --from=%q --file=image-references > /tmp/artifacts/%s +`, pullSpec, target), + } + + // set an explicit default for release-latest resources, but allow customization if necessary + resources := s.resources + if _, ok := resources[podConfig.As]; !ok { + copied := make(api.ResourceConfiguration) + for k, v := range resources { + copied[k] = v + } + // max cpu observed at 0.1 core, most memory ~ 420M + copied[podConfig.As] = api.ResourceRequirements{Requests: api.ResourceList{"cpu": "50m", "memory": "400Mi"}} + resources = copied + } + step := steps.PodStep("release", podConfig, resources, s.podClient, artifactDir, s.jobSpec) + if err := step.Run(ctx, false); err != nil { + return err + } + + // read the contents from the artifacts directory + isContents, err := ioutil.ReadFile(filepath.Join(artifactDir, podConfig.As, target)) + if err != nil { + return fmt.Errorf("unable to read release image stream: %v", err) + } + var releaseIS imageapi.ImageStream + if err := json.Unmarshal(isContents, &releaseIS); err != nil { + return fmt.Errorf("unable to decode release image stream: %v", err) + } + if releaseIS.Kind != "ImageStream" || releaseIS.APIVersion != "image.openshift.io/v1" { + return fmt.Errorf("unexpected image stream contents: %v", err) + } + + // update the stable image stream to have all of the tags from the payload + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + stable, err := s.imageClient.ImageStreams(s.jobSpec.Namespace).Get(streamName, meta.GetOptions{}) + if err != nil { + return fmt.Errorf("could not resolve imagestream %s: %v", streamName, err) + } + + existing := sets.NewString() + tags := make([]imageapi.TagReference, 0, len(releaseIS.Spec.Tags)+len(stable.Spec.Tags)) + for _, tag := range releaseIS.Spec.Tags { + existing.Insert(tag.Name) + tags = append(tags, tag) + } + for _, tag := range stable.Spec.Tags { + if existing.Has(tag.Name) { + continue + } + existing.Insert(tag.Name) + tags = append(tags, tag) + } + stable.Spec.Tags = tags + + _, err = s.imageClient.ImageStreams(s.jobSpec.Namespace).Update(stable) + return err + }); err != nil { + return fmt.Errorf("unable to update stable image stream with release tags: %v", err) + } + + log.Printf("Imported %s to release:%s and updated %s images in %s", providedImage, tag, streamName, time.Now().Sub(start).Truncate(time.Second)) + return nil +} + func (s *assembleReleaseStep) Done() (bool, error) { // TODO: define done return true, nil @@ -172,6 +365,13 @@ func (s *assembleReleaseStep) tag() string { return "initial" } +func (s *assembleReleaseStep) streamName() string { + if s.latest { + return api.StableImageStream + } + return fmt.Sprintf("%s-initial", api.StableImageStream) +} + func (s *assembleReleaseStep) envVar() string { return fmt.Sprintf("RELEASE_IMAGE_%s", strings.ToUpper(s.tag())) } diff --git a/pkg/steps/template.go b/pkg/steps/template.go index a859d2df..83438782 100644 --- a/pkg/steps/template.go +++ b/pkg/steps/template.go @@ -156,7 +156,7 @@ func (s *templateExecutionStep) Run(ctx context.Context, dry bool) error { for _, ref := range instance.Status.Objects { switch { case ref.Ref.Kind == "Pod" && ref.Ref.APIVersion == "v1": - err := waitForPodCompletion(s.podClient.Pods(s.jobSpec.Namespace), ref.Ref.Name, testCaseNotifier) + err := waitForPodCompletion(s.podClient.Pods(s.jobSpec.Namespace), ref.Ref.Name, testCaseNotifier, false) s.subTests = append(s.subTests, testCaseNotifier.SubTests(fmt.Sprintf("%s - %s ", s.Description(), ref.Ref.Name))...) if err != nil { return fmt.Errorf("template pod %q failed: %v", ref.Ref.Name, err) @@ -446,11 +446,10 @@ func waitForCompletedPodDeletion(podClient coreclientset.PodInterface, name stri return waitForPodDeletion(podClient, name, uid) } -func waitForPodCompletion(podClient coreclientset.PodInterface, name string, notifier ContainerNotifier) error { +func waitForPodCompletion(podClient coreclientset.PodInterface, name string, notifier ContainerNotifier, skipLogs bool) error { if notifier == nil { notifier = NopNotifier } - skipLogs := false completed := make(map[string]time.Time) for { retry, err := waitForPodCompletionOrTimeout(podClient, name, completed, notifier, skipLogs) @@ -494,7 +493,7 @@ func waitForPodCompletionOrTimeout(podClient coreclientset.PodInterface, name st if pod.Spec.RestartPolicy == coreapi.RestartPolicyAlways { return false, nil } - podLogNewFailedContainers(podClient, pod, completed, notifier) + podLogNewFailedContainers(podClient, pod, completed, notifier, skipLogs) if podJobIsOK(pod) { if !skipLogs { log.Printf("Pod %s already succeeded in %s", pod.Name, podDuration(pod).Truncate(time.Second)) @@ -512,9 +511,11 @@ func waitForPodCompletionOrTimeout(podClient coreclientset.PodInterface, name st return true, nil } if pod, ok := event.Object.(*coreapi.Pod); ok { - podLogNewFailedContainers(podClient, pod, completed, notifier) + podLogNewFailedContainers(podClient, pod, completed, notifier, skipLogs) if podJobIsOK(pod) { - log.Printf("Pod %s succeeded after %s", pod.Name, podDuration(pod).Truncate(time.Second)) + if !skipLogs { + log.Printf("Pod %s succeeded after %s", pod.Name, podDuration(pod).Truncate(time.Second)) + } return false, nil } if podJobIsFailed(pod) { @@ -523,7 +524,7 @@ func waitForPodCompletionOrTimeout(podClient coreclientset.PodInterface, name st continue } if event.Type == watch.Deleted { - podLogNewFailedContainers(podClient, pod, completed, notifier) + podLogNewFailedContainers(podClient, pod, completed, notifier, skipLogs) return false, appendLogToError(fmt.Errorf("the pod %s/%s was deleted without completing after %s (failed containers: %s)", pod.Namespace, pod.Name, podDuration(pod).Truncate(time.Second), strings.Join(failedContainerNames(pod), ", ")), podMessages(pod)) } log.Printf("error: Unrecognized event in watch: %v %#v", event.Type, event.Object) @@ -680,7 +681,7 @@ func failedContainerNames(pod *coreapi.Pod) []string { return names } -func podLogNewFailedContainers(podClient coreclientset.PodInterface, pod *coreapi.Pod, completed map[string]time.Time, notifier ContainerNotifier) { +func podLogNewFailedContainers(podClient coreclientset.PodInterface, pod *coreapi.Pod, completed map[string]time.Time, notifier ContainerNotifier, skipLogs bool) { var statuses []coreapi.ContainerStatus statuses = append(statuses, pod.Status.InitContainerStatuses...) statuses = append(statuses, pod.Status.ContainerStatuses...) @@ -697,7 +698,9 @@ func podLogNewFailedContainers(podClient coreclientset.PodInterface, pod *coreap notifier.Notify(pod, status.Name) if s.ExitCode == 0 { - log.Printf("Container %s in pod %s completed successfully", status.Name, pod.Name) + if !skipLogs { + log.Printf("Container %s in pod %s completed successfully", status.Name, pod.Name) + } continue } From 2cfaa7c1dfc854b0c939bd3d8ea750302dff7b55 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 6 Mar 2019 17:48:39 -0500 Subject: [PATCH 2/2] When the user specifies IMAGE_FORMAT we should do nothing The release jobs set IMAGE_FORMAT to bypass creating images, and when a job requests RELEASE_IMAGE_INITIAL but doesn't set it to to something we should skip creating the initial image. --- pkg/steps/release/create_release.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/steps/release/create_release.go b/pkg/steps/release/create_release.go index 92f6f94c..29b2d002 100644 --- a/pkg/steps/release/create_release.go +++ b/pkg/steps/release/create_release.go @@ -68,18 +68,29 @@ func (s *assembleReleaseStep) Run(ctx context.Context, dry bool) error { tag := s.tag() streamName := s.streamName() - // if we receive an input, we tag it in instead of generating it - if providedImage := os.Getenv(s.envVar()); len(providedImage) > 0 { + // if the user specified an input env var, we tag it in instead of generating it + providedImage, ok := os.LookupEnv(s.envVar()) + if ok { + if len(providedImage) == 0 { + log.Printf("No %s release image necessary", tag) + return nil + } return s.importFromReleaseImage(ctx, dry, providedImage) } stable, err := s.imageClient.ImageStreams(s.jobSpec.Namespace).Get(streamName, meta.GetOptions{}) if err != nil { + if errors.IsNotFound(err) { + // if a user sets IMAGE_FORMAT=... we skip importing the image stream contents, which prevents us from + // generating a release image. + log.Printf("No %s release image can be generated when the %s image stream was skipped", tag, streamName) + return nil + } return fmt.Errorf("could not resolve imagestream %s: %v", streamName, err) } cvo, ok := resolvePullSpec(stable, "cluster-version-operator", true) if !ok { - log.Printf("No release image necessary, stable image stream does not include a cluster-version-operator image") + log.Printf("No %s release image necessary, %s image stream does not include a cluster-version-operator image", tag, streamName) return nil } if _, ok := resolvePullSpec(stable, "cli", true); !ok {