diff --git a/pkg/cincinnati/cincinnati.go b/pkg/cincinnati/cincinnati.go index 25774cc11..cd6b282b1 100644 --- a/pkg/cincinnati/cincinnati.go +++ b/pkg/cincinnati/cincinnati.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "net/http" + "net/url" "github.com/blang/semver" "github.com/google/uuid" @@ -38,8 +39,19 @@ type Update node // the current version and their payloads indicate from where the actual update // image can be downloaded. func (c Client) GetUpdates(upstream string, channel string, version semver.Version) ([]Update, error) { + // Prepare parametrized cincinnati query. + cincinnatiURL, err := url.Parse(upstream) + if err != nil { + return nil, fmt.Errorf("failed to parse upstream URL: %s", err) + } + queryParams := cincinnatiURL.Query() + queryParams.Add("channel", channel) + queryParams.Add("id", c.id.String()) + queryParams.Add("version", version.String()) + cincinnatiURL.RawQuery = queryParams.Encode() + // Download the update graph. - req, err := http.NewRequest("GET", upstream, nil) + req, err := http.NewRequest("GET", cincinnatiURL.String(), nil) if err != nil { return nil, err } diff --git a/pkg/cincinnati/cincinnati_test.go b/pkg/cincinnati/cincinnati_test.go index 4300066ea..e07f55908 100644 --- a/pkg/cincinnati/cincinnati_test.go +++ b/pkg/cincinnati/cincinnati_test.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" "reflect" "testing" @@ -14,97 +15,115 @@ import ( ) func TestGetUpdates(t *testing.T) { - handler := func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet && r.Method != http.MethodHead { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - - mtype := r.Header.Get("Accept") - if mtype != GraphMediaType { - w.WriteHeader(http.StatusUnsupportedMediaType) - return - } - - _, err := w.Write([]byte(`{ - "nodes": [ - { - "version": "4.0.0-4", - "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-4", - "metadata": {} - }, - { - "version": "4.0.0-5", - "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-5", - "metadata": {} - }, - { - "version": "4.0.0-6", - "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-6", - "metadata": {} - }, - { - "version": "4.0.0-6+2", - "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-6+2", - "metadata": {} - }, - { - "version": "4.0.0-0.okd-0", - "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-0.okd-0", - "metadata": {} - }, - { - "version": "4.0.0-0.2", - "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-0.2", - "metadata": {} - }, - { - "version": "4.0.0-0.3", - "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-0.3", - "metadata": {} - } - ], - "edges": [[0,1],[1,2],[1,3],[5,6]] - }`)) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - } - + clientID := uuid.Must(uuid.Parse("01234567-0123-0123-0123-0123456789ab")) + channelName := "test-channel" tests := []struct { name string version string - available []Update - err string + expectedQuery string + available []Update + err string }{{ - name: "one update available", - version: "4.0.0-4", + name: "one update available", + version: "4.0.0-4", + expectedQuery: "channel=test-channel&id=01234567-0123-0123-0123-0123456789ab&version=4.0.0-4", available: []Update{ {semver.MustParse("4.0.0-5"), "quay.io/openshift-release-dev/ocp-release:4.0.0-5"}, }, }, { - name: "two updates available", - version: "4.0.0-5", + name: "two updates available", + version: "4.0.0-5", + expectedQuery: "channel=test-channel&id=01234567-0123-0123-0123-0123456789ab&version=4.0.0-5", available: []Update{ {semver.MustParse("4.0.0-6"), "quay.io/openshift-release-dev/ocp-release:4.0.0-6"}, {semver.MustParse("4.0.0-6+2"), "quay.io/openshift-release-dev/ocp-release:4.0.0-6+2"}, }, }, { - name: "no updates available", - version: "4.0.0-0.okd-0", + name: "no updates available", + version: "4.0.0-0.okd-0", + expectedQuery: "channel=test-channel&id=01234567-0123-0123-0123-0123456789ab&version=4.0.0-0.okd-0", }, { - name: "unknown version", - version: "4.0.0-3", - err: "unknown version 4.0.0-3", + name: "unknown version", + version: "4.0.0-3", + expectedQuery: "channel=test-channel&id=01234567-0123-0123-0123-0123456789ab&version=4.0.0-3", + err: "unknown version 4.0.0-3", }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { + requestQuery := make(chan string, 1) + defer close(requestQuery) + + handler := func(w http.ResponseWriter, r *http.Request) { + select { + case requestQuery <- r.URL.RawQuery: + default: + t.Fatalf("received multiple requests at upstream URL") + } + + if r.Method != http.MethodGet && r.Method != http.MethodHead { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + mtype := r.Header.Get("Accept") + if mtype != GraphMediaType { + w.WriteHeader(http.StatusUnsupportedMediaType) + return + } + + _, err := w.Write([]byte(`{ + "nodes": [ + { + "version": "4.0.0-4", + "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-4", + "metadata": {} + }, + { + "version": "4.0.0-5", + "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-5", + "metadata": {} + }, + { + "version": "4.0.0-6", + "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-6", + "metadata": {} + }, + { + "version": "4.0.0-6+2", + "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-6+2", + "metadata": {} + }, + { + "version": "4.0.0-0.okd-0", + "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-0.okd-0", + "metadata": {} + }, + { + "version": "4.0.0-0.2", + "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-0.2", + "metadata": {} + }, + { + "version": "4.0.0-0.3", + "payload": "quay.io/openshift-release-dev/ocp-release:4.0.0-0.3", + "metadata": {} + } + ], + "edges": [[0,1],[1,2],[1,3],[5,6]] + }`)) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + } + ts := httptest.NewServer(http.HandlerFunc(handler)) - c := NewClient(uuid.New()) + defer ts.Close() - updates, err := c.GetUpdates(ts.URL, "", semver.MustParse(test.version)) + c := NewClient(clientID) + + updates, err := c.GetUpdates(ts.URL, channelName, semver.MustParse(test.version)) if test.err == "" { if err != nil { t.Fatalf("expected nil error, got: %v", err) @@ -117,6 +136,24 @@ func TestGetUpdates(t *testing.T) { t.Fatalf("expected err to be %s, got: %v", test.err, err) } } + + actualQuery := "" + select { + case actualQuery = <-requestQuery: + default: + t.Fatal("no request received at upstream URL") + } + expectedQueryValues, err := url.ParseQuery(test.expectedQuery) + if err != nil { + t.Fatalf("could not parse expected query: %v", err) + } + actualQueryValues, err := url.ParseQuery(actualQuery) + if err != nil { + t.Fatalf("could not parse acutal query: %v", err) + } + if e, a := expectedQueryValues, actualQueryValues; !reflect.DeepEqual(e, a) { + t.Errorf("expected query to be %q, got: %q", e, a) + } }) } } diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index b9321a54b..5c3f82128 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -5,6 +5,9 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" "os" "path/filepath" "reflect" @@ -13,6 +16,7 @@ import ( "testing" "time" + "github.com/google/uuid" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -552,6 +556,143 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { } } +func TestIntegrationCVO_cincinnatiRequest(t *testing.T) { + if os.Getenv("TEST_INTEGRATION") != "1" { + t.Skipf("Integration tests are disabled unless TEST_INTEGRATION=1") + } + t.Parallel() + + requestQuery := make(chan string, 100) + defer close(requestQuery) + + handler := func(w http.ResponseWriter, r *http.Request) { + select { + case requestQuery <- r.URL.RawQuery: + default: + t.Errorf("received too many requests at upstream URL") + } + } + upstreamServer := httptest.NewServer(http.HandlerFunc(handler)) + defer upstreamServer.Close() + + // use the same client setup as the start command + cb, err := newClientBuilder("") + if err != nil { + t.Fatal(err) + } + cfg := cb.RestConfig() + kc := cb.KubeClientOrDie("integration-test") + client := cb.ClientOrDie("integration-test") + + ns := fmt.Sprintf("e2e-cvo-%s", randutil.String(4)) + + if _, err := kc.Core().Namespaces().Create(&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: ns, + }, + }); err != nil { + t.Fatal(err) + } + defer func() { + if err := client.Config().ClusterVersions().Delete(ns, nil); err != nil { + t.Logf("failed to delete cluster version %s: %v", ns, err) + } + if err := kc.Core().Namespaces().Delete(ns, nil); err != nil { + t.Logf("failed to delete namespace %s: %v", ns, err) + } + }() + + id, _ := uuid.NewRandom() + + client.ConfigV1().ClusterVersions().Create(&configv1.ClusterVersion{ + ObjectMeta: metav1.ObjectMeta{ + Name: ns, + }, + Spec: configv1.ClusterVersionSpec{ + Upstream: configv1.URL(upstreamServer.URL), + Channel: "test-channel", + ClusterID: configv1.ClusterID(id.String()), + }, + }) + + dir, err := ioutil.TempDir("", "cvo-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + if err := createContent(filepath.Join(dir, "0.0.1"), version_0_0_1, map[string]string{"NAMESPACE": ns}); err != nil { + t.Fatal(err) + } + payloadImage1 := "arbitrary/release:image" + retriever := &mapPayloadRetriever{map[string]string{ + payloadImage1: filepath.Join(dir, "0.0.1"), + }} + payloadDir := filepath.Join(dir, "payload") + if err := os.Mkdir(payloadDir, 0777); err != nil { + t.Fatal(err) + } + manifestsDir := filepath.Join(payloadDir, "manifests") + if err := os.Mkdir(manifestsDir, 0777); err != nil { + t.Fatal(err) + } + releaseManifestsDir := filepath.Join(payloadDir, "release-manifests") + if err := os.Mkdir(releaseManifestsDir, 0777); err != nil { + t.Fatal(err) + } + if err := ioutil.WriteFile(filepath.Join(releaseManifestsDir, "image-references"), []byte(`kind: ImageStream +apiVersion: image.openshift.io/v1 +metadata: + name: 0.0.1 +`), 0777); err != nil { + t.Fatal(err) + } + + options := NewOptions() + options.Namespace = ns + options.Name = ns + options.ListenAddr = "" + options.NodeName = "test-node" + options.ReleaseImage = payloadImage1 + options.PayloadOverride = payloadDir + options.EnableMetrics = false + controllers := options.NewControllerContext(cb) + + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + controllers.CVO.SetSyncWorkerForTesting(worker) + + stopCh := make(chan struct{}) + defer close(stopCh) + controllers.Start(stopCh) + + t.Logf("wait until we observe the cluster version become available") + lastCV, err := waitForUpdateAvailable(t, client, ns, false, "0.0.1") + if err != nil { + t.Logf("latest version:\n%s", printCV(lastCV)) + t.Fatalf("cluster version never became available: %v", err) + } + + t.Logf("wait until we observe the request to the upstream url") + actualQuery := "" + select { + case actualQuery = <-requestQuery: + case <-time.After(10 * time.Second): + t.Logf("latest version:\n%s", printCV(lastCV)) + t.Fatal("no request received at upstream URL") + } + expectedQuery := fmt.Sprintf("channel=test-channel&id=%s&version=0.0.1", id.String()) + expectedQueryValues, err := url.ParseQuery(expectedQuery) + if err != nil { + t.Fatalf("could not parse expected query: %v", err) + } + actualQueryValues, err := url.ParseQuery(actualQuery) + if err != nil { + t.Fatalf("could not parse acutal query: %v", err) + } + if e, a := expectedQueryValues, actualQueryValues; !reflect.DeepEqual(e, a) { + t.Errorf("expected query to be %q, got: %q", e, a) + } +} + // waitForAvailableUpdate checks invariants during an upgrade process. versions is a list of the expected versions that // should be seen during update, with the last version being the one we wait to see. func waitForUpdateAvailable(t *testing.T, client clientset.Interface, ns string, allowIncrementalFailure bool, versions ...string) (*configv1.ClusterVersion, error) {