diff --git a/pkg/gclient/scheduler/client.go b/pkg/gclient/scheduler/client.go index 66f79f5f73..408af57e29 100644 --- a/pkg/gclient/scheduler/client.go +++ b/pkg/gclient/scheduler/client.go @@ -19,7 +19,7 @@ package scheduler import ( "context" - scheduler "cloud.google.com/go/scheduler/apiv1" + "cloud.google.com/go/scheduler/apiv1" "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" schedulerpb "google.golang.org/genproto/googleapis/cloud/scheduler/v1" @@ -57,6 +57,11 @@ func (c *schedulerClient) CreateJob(ctx context.Context, req *schedulerpb.Create return c.client.CreateJob(ctx, req, opts...) } +// UpdateJob implements scheduler.CloudSchedulerClient.UpdateJobRequest +func (c *schedulerClient) UpdateJob(ctx context.Context, req *schedulerpb.UpdateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) { + return c.client.UpdateJob(ctx, req, opts...) +} + // DeleteJob implements scheduler.CloudSchedulerClient.DeleteJob func (c *schedulerClient) DeleteJob(ctx context.Context, req *schedulerpb.DeleteJobRequest, opts ...gax.CallOption) error { return c.client.DeleteJob(ctx, req, opts...) diff --git a/pkg/gclient/scheduler/interfaces.go b/pkg/gclient/scheduler/interfaces.go index 09198f3bd0..7a06c60f13 100644 --- a/pkg/gclient/scheduler/interfaces.go +++ b/pkg/gclient/scheduler/interfaces.go @@ -30,6 +30,8 @@ type Client interface { Close() error // CreateJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.CreateJob CreateJob(ctx context.Context, req *schedulerpb.CreateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) + // UpdateJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.UpdateJob + UpdateJob(ctx context.Context, req *schedulerpb.UpdateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) // DeleteJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.DeleteJob DeleteJob(ctx context.Context, req *schedulerpb.DeleteJobRequest, opts ...gax.CallOption) error // GetJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.GetJob diff --git a/pkg/gclient/scheduler/testing/client.go b/pkg/gclient/scheduler/testing/client.go index 04c9e45031..c6728b16e4 100644 --- a/pkg/gclient/scheduler/testing/client.go +++ b/pkg/gclient/scheduler/testing/client.go @@ -51,6 +51,7 @@ type TestClientData struct { CreateClientErr error CreateJobErr error DeleteJobErr error + UpdateJobErr error GetJobErr error CloseErr error } @@ -83,6 +84,16 @@ func (c *testClient) DeleteJob(ctx context.Context, req *schedulerpb.DeleteJobRe return c.data.DeleteJobErr } +// UpdateJob implements client.UpdateJob +func (c *testClient) UpdateJob(ctx context.Context, req *schedulerpb.UpdateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) { + if c.data.UpdateJobErr != nil { + return nil, c.data.UpdateJobErr + } + return &schedulerpb.Job{ + Name: req.Job.Name, + }, nil +} + // GetJob implements client.GetJob func (c *testClient) GetJob(ctx context.Context, req *schedulerpb.GetJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) { if c.data.GetJobErr != nil { diff --git a/pkg/reconciler/events/scheduler/scheduler.go b/pkg/reconciler/events/scheduler/scheduler.go index 858929e901..c2887c93ec 100644 --- a/pkg/reconciler/events/scheduler/scheduler.go +++ b/pkg/reconciler/events/scheduler/scheduler.go @@ -18,15 +18,13 @@ package scheduler import ( "context" - "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - "knative.dev/pkg/logging" - "knative.dev/pkg/reconciler" - schedulerpb "google.golang.org/genproto/googleapis/cloud/scheduler/v1" "google.golang.org/grpc/codes" gstatus "google.golang.org/grpc/status" + corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/logging" + "knative.dev/pkg/reconciler" "github.com/google/knative-gcp/pkg/apis/events/v1beta1" cloudschedulersourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1beta1/cloudschedulersource" @@ -112,8 +110,10 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS } defer client.Close() + pubsubTargetName := resources.GeneratePubSubTargetTopic(scheduler, topic) + // Check if the job exists. - _, err = client.GetJob(ctx, &schedulerpb.GetJobRequest{Name: jobName}) + job, err := client.GetJob(ctx, &schedulerpb.GetJobRequest{Name: jobName}) if err != nil { if st, ok := gstatus.FromError(err); !ok { logging.FromContext(ctx).Desugar().Error("Failed from CloudSchedulerSource client while retrieving CloudSchedulerSource job", zap.String("jobName", jobName), zap.Error(err)) @@ -121,7 +121,7 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS } else if st.Code() == codes.NotFound { // Create the job as it does not exist. For creation, we need a parent, extract it from the jobName. parent := resources.ExtractParentName(jobName) - // Add our jobName, and schedulerName as customAttributes. + // Add jobName as customAttribute. customAttributes := map[string]string{ v1beta1.CloudSchedulerSourceJobName: jobName, } @@ -131,7 +131,7 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS Name: jobName, Target: &schedulerpb.Job_PubsubTarget{ PubsubTarget: &schedulerpb.PubsubTarget{ - TopicName: resources.GeneratePubSubTargetTopic(scheduler, topic), + TopicName: pubsubTargetName, Data: []byte(scheduler.Spec.Data), Attributes: customAttributes, }, @@ -148,6 +148,30 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS return err } } + // TODO remove after 0.16 cut. + actualTarget := job.GetPubsubTarget() + if actualTarget != nil && actualTarget.TopicName != pubsubTargetName { + // This means that it is using a topic with an old name. We will update the target. + _, err = client.UpdateJob(ctx, &schedulerpb.UpdateJobRequest{ + Job: &schedulerpb.Job{ + Name: job.Name, + Target: &schedulerpb.Job_PubsubTarget{ + PubsubTarget: &schedulerpb.PubsubTarget{ + TopicName: pubsubTargetName, + Data: actualTarget.Data, + Attributes: actualTarget.Attributes, + }, + }, + // Needed to add these two here otherwise I was getting an update error. + Schedule: job.Schedule, + TimeZone: job.TimeZone, + }, + }) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to update old CloudSchedulerSource job", zap.String("jobName", jobName), zap.Error(err)) + return err + } + } return nil } diff --git a/pkg/reconciler/events/storage/storage.go b/pkg/reconciler/events/storage/storage.go index 12c9899e68..d82c81a002 100644 --- a/pkg/reconciler/events/storage/storage.go +++ b/pkg/reconciler/events/storage/storage.go @@ -147,7 +147,19 @@ func (r *Reconciler) reconcileNotification(ctx context.Context, storage *v1beta1 // If the notification does exist, then return its ID. if existing, ok := notifications[storage.Status.NotificationID]; ok { - return existing.ID, nil + // TODO remove after the 0.16 cut. + // If the notification exists, need to check whether it is using the updated topic name. + // If not, then we delete it and create it again. + if existing.TopicID != storage.Status.TopicID { + err := bucket.DeleteNotification(ctx, storage.Status.NotificationID) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete old CloudStorageSource notification", zap.Error(err)) + return "", err + } + // We let the creation to happen after this enclosing if, thus we do not return here and need this other else. + } else { + return existing.ID, nil + } } // If the notification does not exist, then create it. diff --git a/pkg/reconciler/intevents/reconciler.go b/pkg/reconciler/intevents/reconciler.go index 2529f32ce2..ad5ed953db 100644 --- a/pkg/reconciler/intevents/reconciler.go +++ b/pkg/reconciler/intevents/reconciler.go @@ -30,7 +30,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" @@ -106,6 +106,22 @@ func (psb *PubSubBase) reconcileTopic(ctx context.Context, pubsubable duck.PubSu } else if err != nil { logging.FromContext(ctx).Desugar().Error("Failed to get Topic", zap.Error(err)) return nil, fmt.Errorf("failed to get Topic: %w", err) + // TODO remove this else if after 0.16 cut. + } else if newTopic.Spec.Topic != t.Spec.Topic { + // We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable. + // We have to delete the oldTopic and create a new one here. + logging.FromContext(ctx).Desugar().Info("Deleting old Topic", zap.Any("topic", t)) + err = topics.Delete(t.Name, nil) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete old Topic", zap.Any("topic", t), zap.Error(err)) + return nil, fmt.Errorf("failed to delete Topic: %w", err) + } + logging.FromContext(ctx).Desugar().Debug("Creating new Topic", zap.Any("topic", newTopic)) + t, err = topics.Create(newTopic) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Any("topic", newTopic), zap.Error(err)) + return nil, fmt.Errorf("failed to create Topic: %w", err) + } // Check whether the specs differ and update the Topic if so. } else if !equality.Semantic.DeepDerivative(newTopic.Spec, t.Spec) { // Don't modify the informers copy. @@ -167,6 +183,22 @@ func (psb *PubSubBase) ReconcilePullSubscription(ctx context.Context, pubsubable logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", newPS), zap.Error(err)) return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, pullSubscriptionCreateFailedReason, "Creating PullSubscription failed with: %s", err.Error()) } + // TODO remove this else if after 0.16 cut. + } else if newPS.Spec.Topic != ps.Spec.Topic { + // We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable. + // We have to delete the old PS and create a new one here. + logging.FromContext(ctx).Desugar().Info("Deleting old PullSubscription", zap.Any("ps", ps)) + err = pullSubscriptions.Delete(ps.Name, nil) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete old PullSubscription", zap.Any("ps", ps), zap.Error(err)) + return nil, fmt.Errorf("failed to delete Pullsubscription: %w", err) + } + logging.FromContext(ctx).Desugar().Debug("Creating new PullSubscription", zap.Any("ps", newPS)) + ps, err = pullSubscriptions.Create(newPS) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", newPS), zap.Error(err)) + return nil, fmt.Errorf("failed to create PullSubscription: %w", err) + } // Check whether the specs differ and update the PS if so. } else if !equality.Semantic.DeepDerivative(newPS.Spec, ps.Spec) { // Don't modify the informers copy. diff --git a/pkg/reconciler/intevents/topic/topic.go b/pkg/reconciler/intevents/topic/topic.go index 9cbaf6890f..cd943c1e37 100644 --- a/pkg/reconciler/intevents/topic/topic.go +++ b/pkg/reconciler/intevents/topic/topic.go @@ -109,7 +109,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, topic *v1beta1.Topic) re // If enablePublisher is false, then skip creating the publisher. if enablePublisher := topic.Spec.EnablePublisher; enablePublisher != nil && !*enablePublisher { - // TODO delete previous publishers before the 0.16 cut: https://github.com/google/knative-gcp/issues/1217 return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, topic.Namespace, topic.Name) } diff --git a/pkg/reconciler/messaging/channel/channel.go b/pkg/reconciler/messaging/channel/channel.go index 91ca4dc54b..132e9fe8bb 100644 --- a/pkg/reconciler/messaging/channel/channel.go +++ b/pkg/reconciler/messaging/channel/channel.go @@ -23,7 +23,6 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" - apierrors "k8s.io/apimachinery/pkg/api/errors" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -213,6 +212,22 @@ func (r *Reconciler) syncSubscribers(ctx context.Context, channel *v1beta1.Chann return err } r.Recorder.Eventf(channel, corev1.EventTypeNormal, "SubscriberCreated", "Created Subscriber %q", ps.Name) + // TODO remove this else if after 0.16 cut. + } else if ps.Spec.Topic != existingPs.Spec.Topic { + // We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable. + // We have to delete the old PS and create a new one here. + logging.FromContext(ctx).Desugar().Info("Deleting old PullSubscription", zap.Any("ps", existingPs)) + err := r.RunClientSet.InternalV1beta1().PullSubscriptions(channel.Namespace).Delete(existingPs.Name, nil) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete old PullSubscription", zap.Any("ps", existingPs), zap.Error(err)) + return fmt.Errorf("failed to delete Pullsubscription: %w", err) + } + logging.FromContext(ctx).Desugar().Debug("Creating new PullSubscription", zap.Any("ps", ps)) + ps, err = r.RunClientSet.InternalV1beta1().PullSubscriptions(channel.Namespace).Create(ps) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", ps), zap.Error(err)) + return fmt.Errorf("failed to create PullSubscription: %w", err) + } } else if !equality.Semantic.DeepEqual(ps.Spec, existingPs.Spec) { // Don't modify the informers copy. desired := existingPs.DeepCopy() @@ -285,23 +300,11 @@ func (r *Reconciler) syncSubscribersStatus(ctx context.Context, channel *v1beta1 } func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.Channel) (*inteventsv1beta1.Topic, error) { - topic, err := r.getTopic(ctx, channel) - if err != nil && !apierrors.IsNotFound(err) { - logging.FromContext(ctx).Desugar().Error("Unable to get a Topic", zap.Error(err)) - return nil, err - } - if topic != nil { - if topic.Status.Address != nil { - channel.Status.SetAddress(topic.Status.Address.URL) - } else { - channel.Status.SetAddress(nil) - } - return topic, nil - } clusterName := channel.GetAnnotations()[duckv1beta1.ClusterNameAnnotation] + name := resources.GeneratePublisherName(channel) t := resources.MakeTopic(&resources.TopicArgs{ Owner: channel, - Name: resources.GeneratePublisherName(channel), + Name: name, Project: channel.Spec.Project, ServiceAccountName: channel.Spec.ServiceAccountName, Secret: channel.Spec.Secret, @@ -310,14 +313,61 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.Channe Annotations: resources.GetTopicAnnotations(clusterName), }) - topic, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Error(err)) - r.Recorder.Eventf(channel, corev1.EventTypeWarning, "TopicCreateFailed", "Failed to created Topic %q: %s", topic.Name, err.Error()) - return nil, err + topic, err := r.getTopic(ctx, channel) + if apierrs.IsNotFound(err) { + topic, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Error(err)) + r.Recorder.Eventf(channel, corev1.EventTypeWarning, "TopicCreateFailed", "Failed to created Topic %q: %s", topic.Name, err.Error()) + return nil, err + } + r.Recorder.Eventf(channel, corev1.EventTypeNormal, "TopicCreated", "Created Topic %q", topic.Name) + return topic, nil + } else if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to get Topic", zap.Error(err)) + return nil, fmt.Errorf("failed to get Topic: %w", err) + } else if !metav1.IsControlledBy(topic, channel) { + channel.Status.MarkTopicNotOwned("Topic %q is owned by another resource.", name) + return nil, fmt.Errorf("Channel: %s does not own Topic: %s", channel.Name, name) + // TODO remove this else if after 0.16 cut. + } else if t.Spec.Topic != topic.Spec.Topic { + // We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable. + // We have to delete the oldTopic and create a new one here. + logging.FromContext(ctx).Desugar().Info("Deleting old Topic", zap.Any("topic", topic)) + err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Delete(topic.Name, nil) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete old Topic", zap.Any("topic", topic), zap.Error(err)) + return nil, fmt.Errorf("failed to update Topic: %w", err) + } + logging.FromContext(ctx).Desugar().Debug("Creating new Topic", zap.Any("topic", t)) + t, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Any("topic", t), zap.Error(err)) + return nil, fmt.Errorf("failed to create Topic: %w", err) + } + return t, nil + } else if !equality.Semantic.DeepDerivative(t.Spec, topic.Spec) { + // Don't modify the informers copy. + desired := topic.DeepCopy() + desired.Spec = t.Spec + logging.FromContext(ctx).Desugar().Debug("Updating Topic", zap.Any("topic", desired)) + t, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Update(desired) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to update Topic", zap.Any("topic", topic), zap.Error(err)) + return nil, fmt.Errorf("failed to update Topic: %w", err) + } + return t, nil } - r.Recorder.Eventf(channel, corev1.EventTypeNormal, "TopicCreated", "Created Topic %q", topic.Name) - return topic, err + + if topic != nil { + if topic.Status.Address != nil { + channel.Status.SetAddress(topic.Status.Address.URL) + } else { + channel.Status.SetAddress(nil) + } + } + + return topic, nil } func (r *Reconciler) getTopic(_ context.Context, channel *v1beta1.Channel) (*inteventsv1beta1.Topic, error) { @@ -326,10 +376,6 @@ func (r *Reconciler) getTopic(_ context.Context, channel *v1beta1.Channel) (*int if err != nil { return nil, err } - if !metav1.IsControlledBy(topic, channel) { - channel.Status.MarkTopicNotOwned("Topic %q is owned by another resource.", name) - return nil, fmt.Errorf("Channel: %s does not own Topic: %s", channel.Name, name) - } return topic, nil }