diff --git a/pkg/steps/artifacts.go b/pkg/steps/artifacts.go index 2a2cc6dd..6108ff63 100644 --- a/pkg/steps/artifacts.go +++ b/pkg/steps/artifacts.go @@ -15,13 +15,14 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/util/sets" + "github.com/golang/glog" coreapi "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/scheme" coreclientset "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" @@ -182,7 +183,7 @@ type PodClient interface { } func copyArtifacts(podClient PodClient, into, ns, name, containerName string, paths []string) error { - log.Printf("Copying artifacts from %s into %s", name, into) + glog.V(4).Infof("Copying artifacts from %s into %s", name, into) var args []string for _, s := range paths { args = append(args, "-C", s, ".") @@ -253,7 +254,10 @@ func copyArtifacts(podClient PodClient, into, ns, name, containerName string, pa size += h.Size } - if size > 0 { + // If we're updating a substantial amount of artifacts, let the user know as a way to + // indicate why the step took a long amount of time. Conversely, if we just got a small + // number of files this is just noise and can be omitted to not distract from other steps. + if size > 1*1000*1000 { log.Printf("Copied %0.2fMi of artifacts from %s to %s", float64(size)/1000000, name, into) } @@ -651,6 +655,9 @@ func gatherContainerLogsOutput(podClient PodClient, artifactDir, namespace, podN if err != nil { return fmt.Errorf("could not list pod: %v", err) } + if len(list.Items) == 0 { + return nil + } pod := &list.Items[0] if pod.Annotations["ci-operator.openshift.io/save-container-logs"] != "true" { diff --git a/pkg/steps/pod.go b/pkg/steps/pod.go index 69e3934a..9b5a2550 100644 --- a/pkg/steps/pod.go +++ b/pkg/steps/pod.go @@ -21,7 +21,15 @@ import ( const testSecretName = "test-secret" const testSecretDefaultPath = "/usr/test-secrets" +// PodStepConfiguration allows other steps to reuse the pod launching and monitoring +// behavior without reimplementing function. It also enforces conventions like naming, +// directory structure, and input image format. More sophisticated reuse of launching +// pods should use RunPod which is more limited. type PodStepConfiguration struct { + // SkipLogs instructs the step to omit informational logs, such as when the pod is + // part of a larger step like release creation where displaying pod specific info + // is confusing to an end user. Failure logs are still printed. + SkipLogs bool As string From api.ImageStreamTagReference Commands string @@ -48,8 +56,9 @@ func (s *podStep) Inputs(ctx context.Context, dry bool) (api.InputDefinition, er } func (s *podStep) Run(ctx context.Context, dry bool) error { - log.Printf("Executing %s %s", s.name, s.config.As) - + if !s.config.SkipLogs { + log.Printf("Executing %s %s", s.name, s.config.As) + } containerResources, err := resourcesFor(s.resources.RequirementsForStep(s.config.As)) if err != nil { return fmt.Errorf("unable to calculate %s pod resources for %s: %s", s.name, s.config.As, err) @@ -107,8 +116,8 @@ func (s *podStep) Run(ctx context.Context, dry bool) error { s.subTests = testCaseNotifier.SubTests(s.Description() + " - ") }() - if err := waitForPodCompletion(s.podClient.Pods(s.jobSpec.Namespace), pod.Name, testCaseNotifier); err != nil { - return fmt.Errorf("test %q failed: %v", pod.Name, err) + if err := waitForPodCompletion(s.podClient.Pods(s.jobSpec.Namespace), pod.Name, testCaseNotifier, s.config.SkipLogs); err != nil { + return fmt.Errorf("%s %q failed: %v", s.name, pod.Name, err) } return nil } @@ -267,3 +276,15 @@ func getSecretVolumeMountFromSecret(secretMountPath string) []coreapi.VolumeMoun }, } } + +// RunPod may be used to run a pod to completion. Provides a simpler interface than +// PodStep and is intended for other steps that may need to run transient actions. +// This pod will not be able to gather artifacts, nor will it report log messages +// unless it fails. +func RunPod(podClient PodClient, pod *coreapi.Pod) error { + pod, err := createOrRestartPod(podClient.Pods(pod.Namespace), pod) + if err != nil { + return err + } + return waitForPodCompletion(podClient.Pods(pod.Namespace), pod.Name, nil, true) +} diff --git a/pkg/steps/release/create_release.go b/pkg/steps/release/create_release.go index e1ea4313..29b2d002 100644 --- a/pkg/steps/release/create_release.go +++ b/pkg/steps/release/create_release.go @@ -2,28 +2,50 @@ package release import ( "context" + "encoding/json" "fmt" + "io/ioutil" "log" "os" + "path/filepath" "strings" "time" coreapi "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + rbacclientset "k8s.io/client-go/kubernetes/typed/rbac/v1" + "k8s.io/client-go/util/retry" imageapi "github.com/openshift/api/image/v1" imageclientset "github.com/openshift/client-go/image/clientset/versioned/typed/image/v1" - "k8s.io/apimachinery/pkg/api/errors" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" - rbacclientset "k8s.io/client-go/kubernetes/typed/rbac/v1" "github.com/openshift/ci-operator/pkg/api" "github.com/openshift/ci-operator/pkg/steps" ) -// assembleReleaseStep knows how to build an update payload image for -// an OpenShift release by waiting for the full release image set to be -// created, then invoking the admin command for building a new release. +// assembleReleaseStep is responsible for creating release images from +// the stable or stable-initial image streams for use with tests that need +// to install or upgrade a cluster. It uses the `cli` image within the +// image stream to create the image and pushes it to a `release` image stream +// at the `latest` or `initial` tags. As output it provides the environment +// variables RELEASE_IMAGE_(LATEST|INITIAL) which can be used by templates +// that invoke the installer. +// +// Since release images describe a set of images, when a user provides +// RELEASE_IMAGE_INITIAL or RELEASE_IMAGE_LATEST as inputs to the ci-operator +// job we treat those as inputs we must expand into the `stable-initial` or +// `stable` image streams. This is because our test scenarios need access not +// just to the release image, but also to the images in that release image +// like installer, cli, or tests. To make it easy for a CI job to install from +// an older release image, we need to extract the 'installer' image into the +// same location that we would expect if it came from a tag_specification. +// The images inside of a release image override any images built or imported +// into the job, which allows you to have an empty tag_specification and +// inject the images from a known historic release for the purposes of building +// branches of those releases. type assembleReleaseStep struct { config api.ReleaseTagConfiguration latest bool @@ -43,51 +65,32 @@ func (s *assembleReleaseStep) Inputs(ctx context.Context, dry bool) (api.InputDe } func (s *assembleReleaseStep) Run(ctx context.Context, dry bool) error { - // if we receive an input, we tag it in instead of generating it - providedImage := os.Getenv(s.envVar()) - if len(providedImage) > 0 { - log.Printf("Setting release image %s to %s", s.tag(), providedImage) - if _, err := s.imageClient.ImageStreamTags(s.jobSpec.Namespace).Update(&imageapi.ImageStreamTag{ - ObjectMeta: meta.ObjectMeta{ - Name: fmt.Sprintf("release:%s", s.tag()), - }, - Tag: &imageapi.TagReference{ - From: &coreapi.ObjectReference{ - Kind: "DockerImage", - Name: providedImage, - }, - }, - }); err != nil { - return err - } - if err := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { - is, err := s.imageClient.ImageStreams(s.jobSpec.Namespace).Get("release", meta.GetOptions{}) - if err != nil { - return false, err - } - ref, _ := findStatusTag(is, s.tag()) - return ref != nil, nil - }); err != nil { - return fmt.Errorf("unable to import %s release image: %v", s.tag(), err) - } - return nil - } - tag := s.tag() - var streamName string - if s.latest { - streamName = api.StableImageStream - } else { - streamName = fmt.Sprintf("%s-initial", api.StableImageStream) + streamName := s.streamName() + + // if the user specified an input env var, we tag it in instead of generating it + providedImage, ok := os.LookupEnv(s.envVar()) + if ok { + if len(providedImage) == 0 { + log.Printf("No %s release image necessary", tag) + return nil + } + return s.importFromReleaseImage(ctx, dry, providedImage) } stable, err := s.imageClient.ImageStreams(s.jobSpec.Namespace).Get(streamName, meta.GetOptions{}) if err != nil { + if errors.IsNotFound(err) { + // if a user sets IMAGE_FORMAT=... we skip importing the image stream contents, which prevents us from + // generating a release image. + log.Printf("No %s release image can be generated when the %s image stream was skipped", tag, streamName) + return nil + } return fmt.Errorf("could not resolve imagestream %s: %v", streamName, err) } cvo, ok := resolvePullSpec(stable, "cluster-version-operator", true) if !ok { - log.Printf("No release image necessary, stable image stream does not include a cluster-version-operator image") + log.Printf("No %s release image necessary, %s image stream does not include a cluster-version-operator image", tag, streamName) return nil } if _, ok := resolvePullSpec(stable, "cli", true); !ok { @@ -145,6 +148,207 @@ oc adm release extract --from=%q --to=/tmp/artifacts/release-payload-%s return step.Run(ctx, dry) } +// importFromReleaseImage uses the provided release image and updates the stable / release streams as +// appropriate with the contents of the payload so that downstream components are using the correct images. +// The most common case is to use the correct installer image, tests, and cli commands. +func (s *assembleReleaseStep) importFromReleaseImage(ctx context.Context, dry bool, providedImage string) error { + tag := s.tag() + streamName := s.streamName() + + if dry { + return nil + } + + start := time.Now() + + // create the stable image stream with lookup policy + _, err := s.imageClient.ImageStreams(s.jobSpec.Namespace).Create(&imageapi.ImageStream{ + ObjectMeta: meta.ObjectMeta{ + Name: streamName, + }, + Spec: imageapi.ImageStreamSpec{ + LookupPolicy: imageapi.ImageLookupPolicy{ + Local: true, + }, + }, + }) + if err != nil && !errors.IsAlreadyExists(err) { + return fmt.Errorf("could not create stable imagestreamtag: %v", err) + } + // tag the release image in and let it import + if _, err := s.imageClient.ImageStreamTags(s.jobSpec.Namespace).Update(&imageapi.ImageStreamTag{ + ObjectMeta: meta.ObjectMeta{ + Name: fmt.Sprintf("release:%s", tag), + }, + Tag: &imageapi.TagReference{ + From: &coreapi.ObjectReference{ + Kind: "DockerImage", + Name: providedImage, + }, + }, + }); err != nil { + return err + } + // wait for the release image to be available + var pullSpec string + if err := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { + is, err := s.imageClient.ImageStreams(s.jobSpec.Namespace).Get("release", meta.GetOptions{}) + if err != nil { + return false, err + } + ref, _ := findStatusTag(is, tag) + if ref == nil { + return false, nil + } + var registry string + if len(is.Status.PublicDockerImageRepository) > 0 { + registry = is.Status.PublicDockerImageRepository + } else if len(is.Status.DockerImageRepository) > 0 { + registry = is.Status.DockerImageRepository + } else { + return false, fmt.Errorf("image stream %s has no accessible image registry value", is.Name) + } + pullSpec = fmt.Sprintf("%s:%s", registry, tag) + return true, nil + }); err != nil { + return fmt.Errorf("unable to import %s release image: %v", tag, err) + } + + // override anything in stable with the contents of the release image + // TODO: should we allow underride for things we built in pipeline? + artifactDir := s.artifactDir + if len(artifactDir) == 0 { + var err error + artifactDir, err = ioutil.TempDir("", "payload-images") + if err != nil { + return fmt.Errorf("unable to create temporary artifact dir for payload extraction") + } + } + + // get the CLI image from the payload (since we need it to run oc adm release extract) + target := fmt.Sprintf("release-images-%s", tag) + targetCLI := fmt.Sprintf("%s-cli", target) + if err := steps.RunPod(s.podClient, &coreapi.Pod{ + ObjectMeta: meta.ObjectMeta{ + Name: targetCLI, + Namespace: s.jobSpec.Namespace, + }, + Spec: coreapi.PodSpec{ + RestartPolicy: coreapi.RestartPolicyNever, + Containers: []coreapi.Container{ + { + Name: "release", + Image: pullSpec, + Command: []string{"/bin/sh", "-c", "cluster-version-operator image cli > /dev/termination-log"}, + }, + }, + }, + }); err != nil { + return fmt.Errorf("unable to find the 'cli' image in the provided release image: %v", err) + } + pod, err := s.podClient.Pods(s.jobSpec.Namespace).Get(targetCLI, meta.GetOptions{}) + if err != nil { + return fmt.Errorf("unable to extract the 'cli' image from the release image: %v", err) + } + if len(pod.Status.ContainerStatuses) == 0 || pod.Status.ContainerStatuses[0].State.Terminated == nil { + return fmt.Errorf("unable to extract the 'cli' image from the release image: %v", err) + } + cliImage := pod.Status.ContainerStatuses[0].State.Terminated.Message + + // tag the cli image into stable so we use the correct pull secrets from the namespace + if _, err := s.imageClient.ImageStreamTags(s.jobSpec.Namespace).Update(&imageapi.ImageStreamTag{ + ObjectMeta: meta.ObjectMeta{ + Name: fmt.Sprintf("%s:cli", streamName), + }, + Tag: &imageapi.TagReference{ + From: &coreapi.ObjectReference{ + Kind: "DockerImage", + Name: cliImage, + }, + }, + }); err != nil { + return err + } + + // run adm release extract and grab the raw image-references from the payload + podConfig := steps.PodStepConfiguration{ + SkipLogs: true, + As: target, + From: api.ImageStreamTagReference{ + Name: api.StableImageStream, + Tag: "cli", + }, + ServiceAccountName: "builder", + ArtifactDir: "/tmp/artifacts", + Commands: fmt.Sprintf(` +set -euo pipefail +export HOME=/tmp +oc registry login +oc adm release extract --from=%q --file=image-references > /tmp/artifacts/%s +`, pullSpec, target), + } + + // set an explicit default for release-latest resources, but allow customization if necessary + resources := s.resources + if _, ok := resources[podConfig.As]; !ok { + copied := make(api.ResourceConfiguration) + for k, v := range resources { + copied[k] = v + } + // max cpu observed at 0.1 core, most memory ~ 420M + copied[podConfig.As] = api.ResourceRequirements{Requests: api.ResourceList{"cpu": "50m", "memory": "400Mi"}} + resources = copied + } + step := steps.PodStep("release", podConfig, resources, s.podClient, artifactDir, s.jobSpec) + if err := step.Run(ctx, false); err != nil { + return err + } + + // read the contents from the artifacts directory + isContents, err := ioutil.ReadFile(filepath.Join(artifactDir, podConfig.As, target)) + if err != nil { + return fmt.Errorf("unable to read release image stream: %v", err) + } + var releaseIS imageapi.ImageStream + if err := json.Unmarshal(isContents, &releaseIS); err != nil { + return fmt.Errorf("unable to decode release image stream: %v", err) + } + if releaseIS.Kind != "ImageStream" || releaseIS.APIVersion != "image.openshift.io/v1" { + return fmt.Errorf("unexpected image stream contents: %v", err) + } + + // update the stable image stream to have all of the tags from the payload + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + stable, err := s.imageClient.ImageStreams(s.jobSpec.Namespace).Get(streamName, meta.GetOptions{}) + if err != nil { + return fmt.Errorf("could not resolve imagestream %s: %v", streamName, err) + } + + existing := sets.NewString() + tags := make([]imageapi.TagReference, 0, len(releaseIS.Spec.Tags)+len(stable.Spec.Tags)) + for _, tag := range releaseIS.Spec.Tags { + existing.Insert(tag.Name) + tags = append(tags, tag) + } + for _, tag := range stable.Spec.Tags { + if existing.Has(tag.Name) { + continue + } + existing.Insert(tag.Name) + tags = append(tags, tag) + } + stable.Spec.Tags = tags + + _, err = s.imageClient.ImageStreams(s.jobSpec.Namespace).Update(stable) + return err + }); err != nil { + return fmt.Errorf("unable to update stable image stream with release tags: %v", err) + } + + log.Printf("Imported %s to release:%s and updated %s images in %s", providedImage, tag, streamName, time.Now().Sub(start).Truncate(time.Second)) + return nil +} + func (s *assembleReleaseStep) Done() (bool, error) { // TODO: define done return true, nil @@ -172,6 +376,13 @@ func (s *assembleReleaseStep) tag() string { return "initial" } +func (s *assembleReleaseStep) streamName() string { + if s.latest { + return api.StableImageStream + } + return fmt.Sprintf("%s-initial", api.StableImageStream) +} + func (s *assembleReleaseStep) envVar() string { return fmt.Sprintf("RELEASE_IMAGE_%s", strings.ToUpper(s.tag())) } diff --git a/pkg/steps/template.go b/pkg/steps/template.go index a859d2df..83438782 100644 --- a/pkg/steps/template.go +++ b/pkg/steps/template.go @@ -156,7 +156,7 @@ func (s *templateExecutionStep) Run(ctx context.Context, dry bool) error { for _, ref := range instance.Status.Objects { switch { case ref.Ref.Kind == "Pod" && ref.Ref.APIVersion == "v1": - err := waitForPodCompletion(s.podClient.Pods(s.jobSpec.Namespace), ref.Ref.Name, testCaseNotifier) + err := waitForPodCompletion(s.podClient.Pods(s.jobSpec.Namespace), ref.Ref.Name, testCaseNotifier, false) s.subTests = append(s.subTests, testCaseNotifier.SubTests(fmt.Sprintf("%s - %s ", s.Description(), ref.Ref.Name))...) if err != nil { return fmt.Errorf("template pod %q failed: %v", ref.Ref.Name, err) @@ -446,11 +446,10 @@ func waitForCompletedPodDeletion(podClient coreclientset.PodInterface, name stri return waitForPodDeletion(podClient, name, uid) } -func waitForPodCompletion(podClient coreclientset.PodInterface, name string, notifier ContainerNotifier) error { +func waitForPodCompletion(podClient coreclientset.PodInterface, name string, notifier ContainerNotifier, skipLogs bool) error { if notifier == nil { notifier = NopNotifier } - skipLogs := false completed := make(map[string]time.Time) for { retry, err := waitForPodCompletionOrTimeout(podClient, name, completed, notifier, skipLogs) @@ -494,7 +493,7 @@ func waitForPodCompletionOrTimeout(podClient coreclientset.PodInterface, name st if pod.Spec.RestartPolicy == coreapi.RestartPolicyAlways { return false, nil } - podLogNewFailedContainers(podClient, pod, completed, notifier) + podLogNewFailedContainers(podClient, pod, completed, notifier, skipLogs) if podJobIsOK(pod) { if !skipLogs { log.Printf("Pod %s already succeeded in %s", pod.Name, podDuration(pod).Truncate(time.Second)) @@ -512,9 +511,11 @@ func waitForPodCompletionOrTimeout(podClient coreclientset.PodInterface, name st return true, nil } if pod, ok := event.Object.(*coreapi.Pod); ok { - podLogNewFailedContainers(podClient, pod, completed, notifier) + podLogNewFailedContainers(podClient, pod, completed, notifier, skipLogs) if podJobIsOK(pod) { - log.Printf("Pod %s succeeded after %s", pod.Name, podDuration(pod).Truncate(time.Second)) + if !skipLogs { + log.Printf("Pod %s succeeded after %s", pod.Name, podDuration(pod).Truncate(time.Second)) + } return false, nil } if podJobIsFailed(pod) { @@ -523,7 +524,7 @@ func waitForPodCompletionOrTimeout(podClient coreclientset.PodInterface, name st continue } if event.Type == watch.Deleted { - podLogNewFailedContainers(podClient, pod, completed, notifier) + podLogNewFailedContainers(podClient, pod, completed, notifier, skipLogs) return false, appendLogToError(fmt.Errorf("the pod %s/%s was deleted without completing after %s (failed containers: %s)", pod.Namespace, pod.Name, podDuration(pod).Truncate(time.Second), strings.Join(failedContainerNames(pod), ", ")), podMessages(pod)) } log.Printf("error: Unrecognized event in watch: %v %#v", event.Type, event.Object) @@ -680,7 +681,7 @@ func failedContainerNames(pod *coreapi.Pod) []string { return names } -func podLogNewFailedContainers(podClient coreclientset.PodInterface, pod *coreapi.Pod, completed map[string]time.Time, notifier ContainerNotifier) { +func podLogNewFailedContainers(podClient coreclientset.PodInterface, pod *coreapi.Pod, completed map[string]time.Time, notifier ContainerNotifier, skipLogs bool) { var statuses []coreapi.ContainerStatus statuses = append(statuses, pod.Status.InitContainerStatuses...) statuses = append(statuses, pod.Status.ContainerStatuses...) @@ -697,7 +698,9 @@ func podLogNewFailedContainers(podClient coreclientset.PodInterface, pod *coreap notifier.Notify(pod, status.Name) if s.ExitCode == 0 { - log.Printf("Container %s in pod %s completed successfully", status.Name, pod.Name) + if !skipLogs { + log.Printf("Container %s in pod %s completed successfully", status.Name, pod.Name) + } continue }