diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 57e35cbef8..f4935f5966 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -223,6 +223,13 @@ func TestCloudAuditLogsSourceBrokerWithPubSubChannel(t *testing.T) { AuditLogsSourceBrokerWithPubSubChannelTestImpl(t) } +// TestCloudSchedulerSourceBrokerWithPubSubChannel tests we can knock a Knative Service from a broker with PubSub Channel from a CloudSchedulerSource. +func TestCloudSchedulerSourceBrokerWithPubSubChannel(t *testing.T) { + cancel := logstream.Start(t) + defer cancel() + SchedulerSourceBrokerWithPubSubChannelTestImpl(t) +} + // TestCloudStorageSource tests we can knock down a target from a CloudStorageSource. func TestCloudStorageSource(t *testing.T) { cancel := logstream.Start(t) diff --git a/test/e2e/lib/scheduler.go b/test/e2e/lib/scheduler.go new file mode 100644 index 0000000000..b7be5cec4e --- /dev/null +++ b/test/e2e/lib/scheduler.go @@ -0,0 +1,61 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lib + +import ( + "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" + kngcpresources "github.com/google/knative-gcp/pkg/reconciler/events/scheduler/resources" + kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" + "github.com/google/knative-gcp/test/e2e/lib/resources" + v1 "k8s.io/api/core/v1" +) + +func MakeSchedulerOrDie(client *Client, + sName, data, targetName string, + so ...kngcptesting.CloudSchedulerSourceOption, +) { + so = append(so, kngcptesting.WithCloudSchedulerSourceLocation("us-central1")) + so = append(so, kngcptesting.WithCloudSchedulerSourceData(data)) + so = append(so, kngcptesting.WithCloudSchedulerSourceSchedule("* * * * *")) + so = append(so, kngcptesting.WithCloudSchedulerSourceSink(ServiceGVK, targetName)) + scheduler := kngcptesting.NewCloudSchedulerSource(sName, client.Namespace, so...) + + client.CreateSchedulerOrFail(scheduler) + client.Core.WaitForResourceReadyOrFail(sName, CloudSchedulerSourceTypeMeta) +} + +func MakeSchedulerJobOrDie(client *Client, data, targetName string) { + job := resources.SchedulerJob(targetName, []v1.EnvVar{ + { + Name: "TIME", + Value: "360", + }, + { + Name: "SUBJECT_PREFIX", + Value: kngcpresources.JobPrefix, + }, + { + Name: "DATA", + Value: data, + }, + { + Name: "TYPE", + Value: v1alpha1.CloudSchedulerSourceExecute, + }, + }) + client.CreateJobOrFail(job, WithServiceForJob(targetName)) +} diff --git a/test/e2e/test_broker_pubsub.go b/test/e2e/test_broker_pubsub.go index 673e3cda9a..b2f7c8aaa5 100644 --- a/test/e2e/test_broker_pubsub.go +++ b/test/e2e/test_broker_pubsub.go @@ -247,6 +247,58 @@ func AuditLogsSourceBrokerWithPubSubChannelTestImpl(t *testing.T) { } } +func SchedulerSourceBrokerWithPubSubChannelTestImpl(t *testing.T) { + data := "my test data" + targetName := "event-display" + sName := "scheduler-test" + + client := lib.Setup(t, true) + defer lib.TearDown(client) + + // Create a target Job to receive the events. + lib.MakeSchedulerJobOrDie(client, data, targetName) + + u := createBrokerWithPubSubChannel(t, client, targetName) + + var url apis.URL = apis.URL(u) + // Just to make sure all resources are ready. + time.Sleep(5 * time.Second) + + // Create the CloudSchedulerSource. + lib.MakeSchedulerOrDie(client, sName, data, targetName, + kngcptesting.WithCloudSchedulerSourceSinkURI(&url), + ) + + msg, err := client.WaitUntilJobDone(client.Namespace, targetName) + if err != nil { + t.Error(err) + } + + t.Logf("Last termination message => %s", msg) + if msg != "" { + out := &lib.TargetOutput{} + if err := json.Unmarshal([]byte(msg), out); err != nil { + t.Error(err) + } + if !out.Success { + // Log the output of scheduler pods + if logs, err := client.LogsFor(client.Namespace, sName, lib.CloudSchedulerSourceTypeMeta); err != nil { + t.Error(err) + } else { + t.Logf("scheduler log: %+v", logs) + } + + // Log the output of the target job pods + if logs, err := client.LogsFor(client.Namespace, targetName, lib.JobTypeMeta); err != nil { + t.Error(err) + } else { + t.Logf("addressable job: %+v", logs) + } + t.Fail() + } + } +} + func createBrokerWithPubSubChannel(t *testing.T, client *lib.Client, targetName string) url.URL { brokerName := helpers.AppendRandomString("pubsub") dummyTriggerName := "dummy-broker-" + brokerName diff --git a/test/e2e/test_scheduler.go b/test/e2e/test_scheduler.go index 2938b46122..8c707e3a19 100644 --- a/test/e2e/test_scheduler.go +++ b/test/e2e/test_scheduler.go @@ -20,13 +20,8 @@ import ( "encoding/json" "testing" - v1 "k8s.io/api/core/v1" - - "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" - kngcpresources "github.com/google/knative-gcp/pkg/reconciler/events/scheduler/resources" kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" "github.com/google/knative-gcp/test/e2e/lib" - "github.com/google/knative-gcp/test/e2e/lib/resources" ) // SmokeCloudSchedulerSourceSetup tests if a CloudSchedulerSource object can be created and be made ready. @@ -56,38 +51,12 @@ func CloudSchedulerSourceWithTargetTestImpl(t *testing.T) { // Create an Addressable to receive scheduler events data := "my test data" targetName := "event-display" - job := resources.SchedulerJob(targetName, []v1.EnvVar{ - { - Name: "TIME", - Value: "360", - }, - { - Name: "SUBJECT_PREFIX", - Value: kngcpresources.JobPrefix, - }, - { - Name: "DATA", - Value: data, - }, - { - Name: "TYPE", - Value: v1alpha1.CloudSchedulerSourceExecute, - }, - }) - client.CreateJobOrFail(job, lib.WithServiceForJob(targetName)) + lib.MakeSchedulerJobOrDie(client, data, targetName) // Create a scheduler sName := "scheduler-test" - scheduler := kngcptesting.NewCloudSchedulerSource(sName, client.Namespace, - kngcptesting.WithCloudSchedulerSourceLocation("us-central1"), - kngcptesting.WithCloudSchedulerSourceData(data), - kngcptesting.WithCloudSchedulerSourceSchedule("* * * * *"), - kngcptesting.WithCloudSchedulerSourceSink(lib.ServiceGVK, targetName), - ) - - client.CreateSchedulerOrFail(scheduler) - client.Core.WaitForResourceReadyOrFail(sName, lib.CloudSchedulerSourceTypeMeta) + lib.MakeSchedulerOrDie(client, sName, data, targetName) msg, err := client.WaitUntilJobDone(client.Namespace, targetName) if err != nil {