From 0b311c4c1115758764cbbf0199acf4507011e27c Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 08:01:24 -0700 Subject: [PATCH 01/12] removing old publisher, ra, and pubsub resources --- pkg/reconciler/events/auditlogs/auditlogs.go | 34 +++++- .../events/auditlogs/auditlogs_test.go | 2 + pkg/reconciler/events/auditlogs/controller.go | 1 + pkg/reconciler/events/scheduler/controller.go | 10 +- pkg/reconciler/events/scheduler/scheduler.go | 14 ++- .../events/scheduler/scheduler_test.go | 3 + pkg/reconciler/events/storage/controller.go | 10 +- pkg/reconciler/events/storage/storage.go | 13 ++- pkg/reconciler/events/storage/storage_test.go | 3 + pkg/reconciler/intevents/controller.go | 13 ++- .../pullsubscription/keda/pullsubscription.go | 27 +++++ .../intevents/pullsubscription/reconciler.go | 60 ++++++++++ pkg/reconciler/intevents/reconciler.go | 36 +++++- pkg/reconciler/intevents/topic/topic.go | 31 ++++- pkg/reconciler/messaging/channel/channel.go | 108 ++++++++++++++---- .../messaging/channel/channel_test.go | 10 +- .../messaging/channel/controller.go | 10 +- 17 files changed, 334 insertions(+), 51 deletions(-) diff --git a/pkg/reconciler/events/auditlogs/auditlogs.go b/pkg/reconciler/events/auditlogs/auditlogs.go index 31564252f4..187e5fb853 100644 --- a/pkg/reconciler/events/auditlogs/auditlogs.go +++ b/pkg/reconciler/events/auditlogs/auditlogs.go @@ -17,9 +17,9 @@ limitations under the License. package auditlogs import ( - "context" - "cloud.google.com/go/logging/logadmin" + "context" + "fmt" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -92,6 +92,15 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, s *v1beta1.CloudAuditLog } s.Status.StackdriverSink = sink s.Status.MarkSinkReady() + + // TODO remove after 0.16 cut. + if err := c.deleteOldSink(ctx, s); err != nil { + return reconciler.NewEvent(corev1.EventTypeWarning, "DeleteSinkFailed", "Failed to delete StackDriver sink: %s", err.Error()) + } + if err := c.deleteOldPubSubTopic(ctx, s); err != nil { + return reconciler.NewEvent(corev1.EventTypeWarning, "DeletePubSubTopicFailed", "Failed to delete PubSub topic: %s", err.Error()) + } + c.Logger.Debugf("Reconciled Stackdriver sink: %+v", sink) return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `CloudAuditLogsSource reconciled: "%s/%s"`, s.Namespace, s.Name) @@ -204,3 +213,24 @@ func (c *Reconciler) FinalizeKind(ctx context.Context, s *v1beta1.CloudAuditLogs s.Status.StackdriverSink = "" return nil } + +// TODO remove after 0.16 cut. +func (c *Reconciler) deleteOldSink(ctx context.Context, s *v1beta1.CloudAuditLogsSource) error { + logadminClient, err := c.logadminClientProvider(ctx, s.Status.ProjectID) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create LogAdmin client", zap.Error(err)) + return err + } + oldSinkName := fmt.Sprintf("sink-%s", string(s.UID)) + if err = logadminClient.DeleteSink(ctx, oldSinkName); status.Code(err) != codes.NotFound { + logging.FromContext(ctx).Desugar().Error("Failed to delete StackDriver sink", zap.String("sinkName", oldSinkName), zap.Error(err)) + return err + } + return nil +} + +// TODO remove after 0.16 cut. +func (c *Reconciler) deleteOldPubSubTopic(ctx context.Context, s *v1beta1.CloudAuditLogsSource) error { + oldTopicName := fmt.Sprintf("cloudauditlogssource-%s", string(s.UID)) + return c.PubSubBase.DeleteOldPubSubTopic(ctx, s, oldTopicName) +} diff --git a/pkg/reconciler/events/auditlogs/auditlogs_test.go b/pkg/reconciler/events/auditlogs/auditlogs_test.go index 8bbb6c6b0c..9aece6de3e 100644 --- a/pkg/reconciler/events/auditlogs/auditlogs_test.go +++ b/pkg/reconciler/events/auditlogs/auditlogs_test.go @@ -1414,6 +1414,8 @@ func TestAllCases(t *testing.T) { ReceiveAdapterName: receiveAdapterName, ReceiveAdapterType: string(converters.CloudAuditLogs), ConfigWatcher: cmw, + // TODO remove after 0.16 cut. + PubsubClientProvider: gpubsub.TestClientCreator(nil), }), Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager, NewGCPAuthTestStore(t, nil)), auditLogsSourceLister: listers.GetCloudAuditLogsSourceLister(), diff --git a/pkg/reconciler/events/auditlogs/controller.go b/pkg/reconciler/events/auditlogs/controller.go index 3de9790488..298d683932 100644 --- a/pkg/reconciler/events/auditlogs/controller.go +++ b/pkg/reconciler/events/auditlogs/controller.go @@ -82,6 +82,7 @@ func newController( ReceiveAdapterName: receiveAdapterName, ReceiveAdapterType: string(converters.CloudAuditLogs), ConfigWatcher: cmw, + PubsubClientProvider: gpubsub.NewClient, }), Identity: identity.NewIdentity(ctx, ipm, gcpas), auditLogsSourceLister: cloudauditlogssourceInformer.Lister(), diff --git a/pkg/reconciler/events/scheduler/controller.go b/pkg/reconciler/events/scheduler/controller.go index b25998ac2c..63ad419fa6 100644 --- a/pkg/reconciler/events/scheduler/controller.go +++ b/pkg/reconciler/events/scheduler/controller.go @@ -37,6 +37,7 @@ import ( pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1beta1/pullsubscription" topicinformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1beta1/topic" cloudschedulersourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1beta1/cloudschedulersource" + gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" gscheduler "github.com/google/knative-gcp/pkg/gclient/scheduler" ) @@ -75,10 +76,11 @@ func newController( c := &Reconciler{ PubSubBase: intevents.NewPubSubBase(ctx, &intevents.PubSubBaseArgs{ - ControllerAgentName: controllerAgentName, - ReceiveAdapterName: receiveAdapterName, - ReceiveAdapterType: string(converters.CloudScheduler), - ConfigWatcher: cmw, + ControllerAgentName: controllerAgentName, + ReceiveAdapterName: receiveAdapterName, + ReceiveAdapterType: string(converters.CloudScheduler), + ConfigWatcher: cmw, + PubsubClientProvider: gpubsub.NewClient, }), Identity: identity.NewIdentity(ctx, ipm, gcpas), schedulerLister: cloudschedulersourceInformer.Lister(), diff --git a/pkg/reconciler/events/scheduler/scheduler.go b/pkg/reconciler/events/scheduler/scheduler.go index 858929e901..380e4ac0e4 100644 --- a/pkg/reconciler/events/scheduler/scheduler.go +++ b/pkg/reconciler/events/scheduler/scheduler.go @@ -18,7 +18,7 @@ package scheduler import ( "context" - + "fmt" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "knative.dev/pkg/logging" @@ -91,6 +91,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, scheduler *v1beta1.Cloud return reconciler.NewEvent(corev1.EventTypeWarning, reconciledFailedReason, "Reconcile Job failed with: %s", err.Error()) } scheduler.Status.MarkJobReady(jobName) + + // TODO remove after 0.16 cut. + if err := r.deleteOldPubSubTopic(ctx, scheduler); err != nil { + return reconciler.NewEvent(corev1.EventTypeWarning, "DeletePubSubTopicFailed", "Failed to delete PubSub topic: %s", err.Error()) + } + return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `CloudSchedulerSource reconciled: "%s/%s"`, scheduler.Namespace, scheduler.Name) } @@ -202,3 +208,9 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, scheduler *v1beta1.CloudS return nil } + +// TODO remove after 0.16 cut. +func (c *Reconciler) deleteOldPubSubTopic(ctx context.Context, scheduler *v1beta1.CloudSchedulerSource) error { + oldTopicName := fmt.Sprintf("scheduler-%s", string(scheduler.UID)) + return c.PubSubBase.DeleteOldPubSubTopic(ctx, scheduler, oldTopicName) +} diff --git a/pkg/reconciler/events/scheduler/scheduler_test.go b/pkg/reconciler/events/scheduler/scheduler_test.go index c9873ce3fd..f80e618c6e 100644 --- a/pkg/reconciler/events/scheduler/scheduler_test.go +++ b/pkg/reconciler/events/scheduler/scheduler_test.go @@ -45,6 +45,7 @@ import ( inteventsv1beta1 "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1beta1/cloudschedulersource" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" + gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" gscheduler "github.com/google/knative-gcp/pkg/gclient/scheduler/testing" "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" "github.com/google/knative-gcp/pkg/reconciler/identity" @@ -1260,6 +1261,8 @@ func TestAllCases(t *testing.T) { ReceiveAdapterName: receiveAdapterName, ReceiveAdapterType: string(converters.CloudScheduler), ConfigWatcher: cmw, + // TODO remove after 0.16 cut. + PubsubClientProvider: gpubsub.TestClientCreator(nil), }), Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager, NewGCPAuthTestStore(t, nil)), schedulerLister: listers.GetCloudSchedulerSourceLister(), diff --git a/pkg/reconciler/events/storage/controller.go b/pkg/reconciler/events/storage/controller.go index 1744a08239..4f97349d0a 100644 --- a/pkg/reconciler/events/storage/controller.go +++ b/pkg/reconciler/events/storage/controller.go @@ -33,6 +33,7 @@ import ( pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1beta1/pullsubscription" topicinformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1beta1/topic" cloudstoragesourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1beta1/cloudstoragesource" + gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" gstorage "github.com/google/knative-gcp/pkg/gclient/storage" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" @@ -75,10 +76,11 @@ func newController( r := &Reconciler{ PubSubBase: intevents.NewPubSubBase(ctx, &intevents.PubSubBaseArgs{ - ControllerAgentName: controllerAgentName, - ReceiveAdapterName: receiveAdapterName, - ReceiveAdapterType: string(converters.CloudStorage), - ConfigWatcher: cmw, + ControllerAgentName: controllerAgentName, + ReceiveAdapterName: receiveAdapterName, + ReceiveAdapterType: string(converters.CloudStorage), + ConfigWatcher: cmw, + PubsubClientProvider: gpubsub.NewClient, }), Identity: identity.NewIdentity(ctx, ipm, gcpas), storageLister: cloudstoragesourceInformer.Lister(), diff --git a/pkg/reconciler/events/storage/storage.go b/pkg/reconciler/events/storage/storage.go index 12c9899e68..63b36fde77 100644 --- a/pkg/reconciler/events/storage/storage.go +++ b/pkg/reconciler/events/storage/storage.go @@ -18,7 +18,7 @@ package storage import ( "context" - + "fmt" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -106,6 +106,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, storage *v1beta1.CloudSt } storage.Status.MarkNotificationReady(notification) + // TODO remove after 0.16 cut. + if err := r.deleteOldPubSubTopic(ctx, storage); err != nil { + return reconciler.NewEvent(corev1.EventTypeWarning, "DeletePubSubTopicFailed", "Failed to delete PubSub topic: %s", err.Error()) + } + return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `CloudStorageSource reconciled: "%s/%s"`, storage.Namespace, storage.Name) } @@ -256,3 +261,9 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, storage *v1beta1.CloudSto // ok to remove finalizer. return nil } + +// TODO remove after 0.16 cut. +func (c *Reconciler) deleteOldPubSubTopic(ctx context.Context, storage *v1beta1.CloudStorageSource) error { + oldTopicName := fmt.Sprintf("storage-%s", string(storage.UID)) + return c.PubSubBase.DeleteOldPubSubTopic(ctx, storage, oldTopicName) +} diff --git a/pkg/reconciler/events/storage/storage_test.go b/pkg/reconciler/events/storage/storage_test.go index 67feb5362a..6f18234d89 100644 --- a/pkg/reconciler/events/storage/storage_test.go +++ b/pkg/reconciler/events/storage/storage_test.go @@ -47,6 +47,7 @@ import ( inteventsv1beta1 "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1beta1/cloudstoragesource" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" + gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" gstorage "github.com/google/knative-gcp/pkg/gclient/storage/testing" "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" "github.com/google/knative-gcp/pkg/reconciler/identity" @@ -1278,6 +1279,8 @@ func TestAllCases(t *testing.T) { ReceiveAdapterName: receiveAdapterName, ReceiveAdapterType: string(converters.CloudStorage), ConfigWatcher: cmw, + // TODO remove after 0.16 cut. + PubsubClientProvider: gpubsub.TestClientCreator(nil), }), Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager, NewGCPAuthTestStore(t, nil)), storageLister: listers.GetCloudStorageSourceLister(), diff --git a/pkg/reconciler/intevents/controller.go b/pkg/reconciler/intevents/controller.go index df4c8c2564..6bb4ec03d1 100644 --- a/pkg/reconciler/intevents/controller.go +++ b/pkg/reconciler/intevents/controller.go @@ -22,14 +22,17 @@ import ( "knative.dev/pkg/configmap" pubsubClient "github.com/google/knative-gcp/pkg/client/injection/client" + gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" ) type PubSubBaseArgs struct { - ControllerAgentName string - ReceiveAdapterName string - ReceiveAdapterType string - ConfigWatcher configmap.Watcher + ControllerAgentName string + ReceiveAdapterName string + ReceiveAdapterType string + ConfigWatcher configmap.Watcher + // TODO remove after 0.16 cut. + PubsubClientProvider gpubsub.CreateFn } func NewPubSubBase(ctx context.Context, args *PubSubBaseArgs) *PubSubBase { @@ -38,5 +41,7 @@ func NewPubSubBase(ctx context.Context, args *PubSubBaseArgs) *PubSubBase { pubsubClient: pubsubClient.Get(ctx), receiveAdapterName: args.ReceiveAdapterName, receiveAdapterType: args.ReceiveAdapterType, + // TODO remove after 0.16 cut. + pubsubClientProvider: args.PubsubClientProvider, } } diff --git a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go index 84b200b787..243a1a8e13 100644 --- a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go +++ b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go @@ -19,6 +19,7 @@ package keda import ( "context" "fmt" + "k8s.io/client-go/dynamic" "strings" "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" @@ -133,6 +134,12 @@ func (r *Reconciler) ReconcileScaledObject(ctx context.Context, ra *appsv1.Deplo } } + // TODO remove after 0.16 cut. + if err := r.deleteOldScaledObject(ctx, src, scaledObjectResourceInterface); err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete old ScaledObject", zap.Error(err)) + return err + } + // TODO propagate ScaledObject status return nil } @@ -140,3 +147,23 @@ func (r *Reconciler) ReconcileScaledObject(ctx context.Context, ra *appsv1.Deplo func (r *Reconciler) FinalizeKind(ctx context.Context, ps *v1beta1.PullSubscription) reconciler.Event { return r.Base.FinalizeKind(ctx, ps) } + +// TODO remove after 0.16 cut. +func (r *Reconciler) deleteOldScaledObject(ctx context.Context, ps *v1beta1.PullSubscription, soResourceInterface dynamic.ResourceInterface) error { + oldName := fmt.Sprintf("cre-so-%s", string(ps.UID)) + _, err := soResourceInterface.Get(oldName, metav1.GetOptions{}) + if err != nil { + if apierrs.IsNotFound(err) { + logging.FromContext(ctx).Desugar().Debug("ScaledObject already deleted", zap.String("so", oldName)) + return nil + } + logging.FromContext(ctx).Desugar().Error("Failed to get ScaledObject", zap.String("so", oldName), zap.Error(err)) + return err + } + err = soResourceInterface.Delete(oldName, &metav1.DeleteOptions{}) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete ScaledObject", zap.String("so", oldName), zap.Error(err)) + return err + } + return nil +} diff --git a/pkg/reconciler/intevents/pullsubscription/reconciler.go b/pkg/reconciler/intevents/pullsubscription/reconciler.go index 44ba45d6b2..7878bf89e0 100644 --- a/pkg/reconciler/intevents/pullsubscription/reconciler.go +++ b/pkg/reconciler/intevents/pullsubscription/reconciler.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "encoding/json" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -173,6 +174,13 @@ func (r *Base) reconcileSubscription(ctx context.Context, ps *v1beta1.PullSubscr } defer client.Close() + // TODO remove after 0.16 cut. + err = r.deleteOldSubscription(ctx, ps, client) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete Pub/Sub subscription", zap.Error(err)) + return "", err + } + // Generate the subscription name subID := resources.GenerateSubscriptionName(ps) @@ -321,6 +329,13 @@ func (r *Base) reconcileDataPlaneResources(ctx context.Context, ps *v1beta1.Pull TracingConfig: tracingConfig, }) + // TODO remove after the 0.16 cut. + err = r.deleteOldReceiveAdapter(ctx, ps) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Error deleting old receive adapter", zap.Error(err)) + return err + } + return f(ctx, desired, ps) } @@ -439,3 +454,48 @@ func (r *Base) FinalizeKind(ctx context.Context, ps *v1beta1.PullSubscription) r } return nil } + +// TODO remove after 0.16 is cut. +func (r *Base) deleteOldReceiveAdapter(ctx context.Context, ps *v1beta1.PullSubscription) reconciler.Event { + // This is the old receive adapter name. + name := fmt.Sprintf("cre-pull-%s", string(ps.UID)) + ra, err := r.KubeClientSet.AppsV1().Deployments(ps.Namespace).Get(name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + logging.FromContext(ctx).Desugar().Debug("Receive Adapter already deleted", zap.Error(err)) + return nil + } + logging.FromContext(ctx).Desugar().Error("Failed to get old receive adapter", zap.Error(err)) + return fmt.Errorf("failed to get receive adapter %q for ps %q", name, ps.Name) + } else if !metav1.IsControlledBy(ra, ps) { + adapter, _ := json.Marshal(ra) + logging.FromContext(ctx).Desugar().Error("PullSubscription does not own receive adapter", zap.Any("adapter", adapter)) + return fmt.Errorf("PullSubscription %q does not own receive adapter: %q", ps.Name, name) + } + + err = r.KubeClientSet.AppsV1().Deployments(ps.Namespace).Delete(name, &metav1.DeleteOptions{}) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete receive adapter", zap.Error(err)) + return err + } + return nil +} + +// TODO remove after 0.16 is cut. +func (r *Base) deleteOldSubscription(ctx context.Context, ps *v1beta1.PullSubscription, client gpubsub.Client) reconciler.Event { + // This is the name of the old Pub/Sub subscription. + name := fmt.Sprintf("cre-pull-%s", string(ps.UID)) + sub := client.Subscription(name) + exists, err := sub.Exists(ctx) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to verify Pub/Sub subscription exists", zap.Error(err)) + return err + } + if exists { + if err := sub.Delete(ctx); err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete Pub/Sub subscription", zap.Error(err)) + return err + } + } + return nil +} diff --git a/pkg/reconciler/intevents/reconciler.go b/pkg/reconciler/intevents/reconciler.go index 2529f32ce2..ffa823b97d 100644 --- a/pkg/reconciler/intevents/reconciler.go +++ b/pkg/reconciler/intevents/reconciler.go @@ -24,13 +24,14 @@ import ( inteventsv1beta1 "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" clientset "github.com/google/knative-gcp/pkg/client/clientset/versioned" duck "github.com/google/knative-gcp/pkg/duck/v1beta1" + gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/intevents/resources" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" @@ -56,6 +57,9 @@ type PubSubBase struct { // What type of receive adapter to use. receiveAdapterType string + + // TODO remove after 0.16 cut. + pubsubClientProvider gpubsub.CreateFn } // ReconcilePubSub reconciles Topic / PullSubscription given a PubSubSpec. @@ -282,3 +286,33 @@ func (psb *PubSubBase) DeletePubSub(ctx context.Context, pubsubable duck.PubSuba status.SinkURI = nil return nil } + +// TODO remove after 0.16 cut. +func (psb *PubSubBase) DeleteOldPubSubTopic(ctx context.Context, pubsubable duck.PubSubable, topic string) error { + // At this point the project ID should have been populated in the status. + // Querying Pub/Sub as the topic could have been deleted outside the cluster (e.g, through gcloud). + status := pubsubable.PubSubStatus() + client, err := psb.pubsubClientProvider(ctx, status.ProjectID) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create Pub/Sub client", zap.Error(err)) + return err + } + defer client.Close() + + t := client.Topic(topic) + exists, err := t.Exists(ctx) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to verify Pub/Sub topic exists", zap.String("topic", topic), zap.Error(err)) + return err + } + if exists { + // Delete the topic. + if err := t.Delete(ctx); err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete Pub/Sub topic", zap.String("topic", topic), zap.Error(err)) + return err + } + } + return nil +} + + diff --git a/pkg/reconciler/intevents/topic/topic.go b/pkg/reconciler/intevents/topic/topic.go index 9cbaf6890f..fd4fd0c259 100644 --- a/pkg/reconciler/intevents/topic/topic.go +++ b/pkg/reconciler/intevents/topic/topic.go @@ -109,7 +109,10 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, topic *v1beta1.Topic) re // If enablePublisher is false, then skip creating the publisher. if enablePublisher := topic.Spec.EnablePublisher; enablePublisher != nil && !*enablePublisher { - // TODO delete previous publishers before the 0.16 cut: https://github.com/google/knative-gcp/issues/1217 + // TODO remove after 0.16 cut. + if err := r.deleteOldPublisher(ctx, topic); err != nil { + return reconciler.NewEvent(corev1.EventTypeWarning, "PublisherDeleteFailed", "Failed to delete publisher: %s", err.Error()) + } return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, topic.Namespace, topic.Name) } @@ -255,6 +258,32 @@ func (r *Reconciler) reconcilePublisher(ctx context.Context, topic *v1beta1.Topi return nil, svc } +// TODO remove after 0.16 cut. +func (r *Reconciler) deleteOldPublisher(ctx context.Context, topic *v1beta1.Topic) error { + // We haven't changed the publisher name, so this can remain as is + name := resources.GeneratePublisherName(topic) + existing, err := r.serviceLister.Services(topic.Namespace).Get(name) + if err != nil { + if apierrors.IsNotFound(err) { + logging.FromContext(ctx).Desugar().Debug("Publisher already deleted", zap.Error(err)) + return nil + } + logging.FromContext(ctx).Desugar().Error("Failed to get publisher", zap.Error(err)) + return fmt.Errorf("failed to get publisher %q for topic %q", name, topic.Name) + } else if !metav1.IsControlledBy(existing, topic) { + p, _ := json.Marshal(existing) + logging.FromContext(ctx).Desugar().Error("Topic does not own publisher service", zap.Any("publisher", p)) + return fmt.Errorf("Topic %q does not own publisher service: %q", topic.Name, name) + } + + err = r.ServingClientSet.ServingV1().Services(topic.Namespace).Delete(name, &metav1.DeleteOptions{}) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete publisher", zap.Error(err)) + return err + } + return nil +} + func (r *Reconciler) UpdateFromTracingConfigMap(cfg *corev1.ConfigMap) { if cfg == nil { r.Logger.Error("Tracing ConfigMap is nil") diff --git a/pkg/reconciler/messaging/channel/channel.go b/pkg/reconciler/messaging/channel/channel.go index 91ca4dc54b..1e51ec4325 100644 --- a/pkg/reconciler/messaging/channel/channel.go +++ b/pkg/reconciler/messaging/channel/channel.go @@ -19,11 +19,11 @@ package channel import ( "context" "fmt" + "github.com/google/knative-gcp/pkg/utils" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" - apierrors "k8s.io/apimachinery/pkg/api/errors" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -37,6 +37,8 @@ import ( channelreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/messaging/v1beta1/channel" inteventslisters "github.com/google/knative-gcp/pkg/client/listers/intevents/v1beta1" listers "github.com/google/knative-gcp/pkg/client/listers/messaging/v1beta1" + metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata" + gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/messaging/channel/resources" @@ -61,6 +63,9 @@ type Reconciler struct { // listers index properties about resources channelLister listers.ChannelLister topicLister inteventslisters.TopicLister + + // TODO remove after 0.16 cut. + pubsubClientProvider gpubsub.CreateFn } // Check that our Reconciler implements Interface. @@ -100,6 +105,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *v1beta1.Channel return pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciledSubscribersStatusFailedReason, "Reconcile Subscribers Status failed with: %s", err.Error()) } + // TODO remove after 0.16 cut. + if err := r.deleteOldPubSubTopic(ctx, channel); err != nil { + return pkgreconciler.NewEvent(corev1.EventTypeWarning, "DeletePubSubTopicFailed", "Failed to delete PubSub topic: %s", err.Error()) + } + return pkgreconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `Channel reconciled: "%s/%s"`, channel.Namespace, channel.Name) } @@ -286,22 +296,12 @@ func (r *Reconciler) syncSubscribersStatus(ctx context.Context, channel *v1beta1 func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.Channel) (*inteventsv1beta1.Topic, error) { topic, err := r.getTopic(ctx, channel) - if err != nil && !apierrors.IsNotFound(err) { - logging.FromContext(ctx).Desugar().Error("Unable to get a Topic", zap.Error(err)) - return nil, err - } - if topic != nil { - if topic.Status.Address != nil { - channel.Status.SetAddress(topic.Status.Address.URL) - } else { - channel.Status.SetAddress(nil) - } - return topic, nil - } + clusterName := channel.GetAnnotations()[duckv1beta1.ClusterNameAnnotation] + name := resources.GeneratePublisherName(channel) t := resources.MakeTopic(&resources.TopicArgs{ Owner: channel, - Name: resources.GeneratePublisherName(channel), + Name: name, Project: channel.Spec.Project, ServiceAccountName: channel.Spec.ServiceAccountName, Secret: channel.Spec.Secret, @@ -310,14 +310,43 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.Channe Annotations: resources.GetTopicAnnotations(clusterName), }) - topic, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Error(err)) - r.Recorder.Eventf(channel, corev1.EventTypeWarning, "TopicCreateFailed", "Failed to created Topic %q: %s", topic.Name, err.Error()) - return nil, err + if apierrs.IsNotFound(err) { + topic, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Error(err)) + r.Recorder.Eventf(channel, corev1.EventTypeWarning, "TopicCreateFailed", "Failed to created Topic %q: %s", topic.Name, err.Error()) + return nil, err + } + r.Recorder.Eventf(channel, corev1.EventTypeNormal, "TopicCreated", "Created Topic %q", topic.Name) + return topic, nil + } else if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to get Topic", zap.Error(err)) + return nil, fmt.Errorf("failed to get Topic: %w", err) + } else if !metav1.IsControlledBy(topic, channel) { + channel.Status.MarkTopicNotOwned("Topic %q is owned by another resource.", name) + return nil, fmt.Errorf("Channel: %s does not own Topic: %s", channel.Name, name) + } else if !equality.Semantic.DeepDerivative(t.Spec, topic.Spec) { + // Don't modify the informers copy. + desired := topic.DeepCopy() + desired.Spec = t.Spec + logging.FromContext(ctx).Desugar().Debug("Updating Topic", zap.Any("topic", desired)) + t, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Update(desired) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to update Topic", zap.Any("topic", topic), zap.Error(err)) + return nil, fmt.Errorf("failed to update Topic: %w", err) + } + return t, nil + } + + if topic != nil { + if topic.Status.Address != nil { + channel.Status.SetAddress(topic.Status.Address.URL) + } else { + channel.Status.SetAddress(nil) + } } - r.Recorder.Eventf(channel, corev1.EventTypeNormal, "TopicCreated", "Created Topic %q", topic.Name) - return topic, err + + return topic, nil } func (r *Reconciler) getTopic(_ context.Context, channel *v1beta1.Channel) (*inteventsv1beta1.Topic, error) { @@ -326,10 +355,6 @@ func (r *Reconciler) getTopic(_ context.Context, channel *v1beta1.Channel) (*int if err != nil { return nil, err } - if !metav1.IsControlledBy(topic, channel) { - channel.Status.MarkTopicNotOwned("Topic %q is owned by another resource.", name) - return nil, fmt.Errorf("Channel: %s does not own Topic: %s", channel.Name, name) - } return topic, nil } @@ -378,3 +403,36 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *v1beta1.Channel) return nil } + +// TODO remove after 0.16 cut. +func (r *Reconciler) deleteOldPubSubTopic(ctx context.Context, channel *v1beta1.Channel) error { + oldTopicName := fmt.Sprintf("cre-chan-%s", string(channel.UID)) + + projectID, err := utils.ProjectID(channel.Spec.Project, metadataClient.NewDefaultMetadataClient()) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to find project id", zap.Error(err)) + return err + } + + client, err := r.pubsubClientProvider(ctx, projectID) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create Pub/Sub client", zap.Error(err)) + return err + } + defer client.Close() + + t := client.Topic(oldTopicName) + exists, err := t.Exists(ctx) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to verify Pub/Sub topic exists", zap.String("topic", oldTopicName), zap.Error(err)) + return err + } + if exists { + // Delete the topic. + if err := t.Delete(ctx); err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete Pub/Sub topic", zap.String("topic", oldTopicName), zap.Error(err)) + return err + } + } + return nil +} diff --git a/pkg/reconciler/messaging/channel/channel_test.go b/pkg/reconciler/messaging/channel/channel_test.go index a424affa1e..de08079802 100644 --- a/pkg/reconciler/messaging/channel/channel_test.go +++ b/pkg/reconciler/messaging/channel/channel_test.go @@ -42,6 +42,7 @@ import ( "github.com/google/knative-gcp/pkg/apis/messaging/v1beta1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/messaging/v1beta1/channel" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" + gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/messaging/channel/resources" @@ -499,10 +500,11 @@ func TestAllCases(t *testing.T) { defer logtesting.ClearAll() table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, _ map[string]interface{}) controller.Reconciler { r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager, NewGCPAuthTestStore(t, nil)), - channelLister: listers.GetChannelLister(), - topicLister: listers.GetTopicLister(), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager, NewGCPAuthTestStore(t, nil)), + channelLister: listers.GetChannelLister(), + topicLister: listers.GetTopicLister(), + pubsubClientProvider: gpubsub.TestClientCreator(nil), } return channel.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetChannelLister(), r.Recorder, r) })) diff --git a/pkg/reconciler/messaging/channel/controller.go b/pkg/reconciler/messaging/channel/controller.go index d12ad01511..335b692e0e 100644 --- a/pkg/reconciler/messaging/channel/controller.go +++ b/pkg/reconciler/messaging/channel/controller.go @@ -31,6 +31,7 @@ import ( topicinformer "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1beta1/topic" channelinformer "github.com/google/knative-gcp/pkg/client/injection/informers/messaging/v1beta1/channel" channelreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/messaging/v1beta1/channel" + gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" @@ -68,10 +69,11 @@ func newController( serviceAccountInformer := serviceaccountinformers.Get(ctx) r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - Identity: identity.NewIdentity(ctx, ipm, gcpas), - channelLister: channelInformer.Lister(), - topicLister: topicInformer.Lister(), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + Identity: identity.NewIdentity(ctx, ipm, gcpas), + channelLister: channelInformer.Lister(), + topicLister: topicInformer.Lister(), + pubsubClientProvider: gpubsub.NewClient, } impl := channelreconciler.NewImpl(ctx, r) From 9d2afc1877090134b07ab9f04eeae9642d41e7f0 Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 10:48:31 -0700 Subject: [PATCH 02/12] removing topic and creating it again for sources when the spec.topic changed. That can only happen on updates to 0.16 --- pkg/reconciler/events/auditlogs/auditlogs.go | 9 ---- .../events/auditlogs/auditlogs_test.go | 2 - pkg/reconciler/events/auditlogs/controller.go | 1 - pkg/reconciler/events/scheduler/controller.go | 10 ++-- pkg/reconciler/events/scheduler/scheduler.go | 12 ----- .../events/scheduler/scheduler_test.go | 3 -- pkg/reconciler/events/storage/controller.go | 10 ++-- pkg/reconciler/events/storage/storage.go | 12 ----- pkg/reconciler/events/storage/storage_test.go | 3 -- pkg/reconciler/intevents/controller.go | 13 ++--- pkg/reconciler/intevents/reconciler.go | 50 ++++++------------- 11 files changed, 28 insertions(+), 97 deletions(-) diff --git a/pkg/reconciler/events/auditlogs/auditlogs.go b/pkg/reconciler/events/auditlogs/auditlogs.go index 187e5fb853..83bd98d4d7 100644 --- a/pkg/reconciler/events/auditlogs/auditlogs.go +++ b/pkg/reconciler/events/auditlogs/auditlogs.go @@ -97,9 +97,6 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, s *v1beta1.CloudAuditLog if err := c.deleteOldSink(ctx, s); err != nil { return reconciler.NewEvent(corev1.EventTypeWarning, "DeleteSinkFailed", "Failed to delete StackDriver sink: %s", err.Error()) } - if err := c.deleteOldPubSubTopic(ctx, s); err != nil { - return reconciler.NewEvent(corev1.EventTypeWarning, "DeletePubSubTopicFailed", "Failed to delete PubSub topic: %s", err.Error()) - } c.Logger.Debugf("Reconciled Stackdriver sink: %+v", sink) @@ -228,9 +225,3 @@ func (c *Reconciler) deleteOldSink(ctx context.Context, s *v1beta1.CloudAuditLog } return nil } - -// TODO remove after 0.16 cut. -func (c *Reconciler) deleteOldPubSubTopic(ctx context.Context, s *v1beta1.CloudAuditLogsSource) error { - oldTopicName := fmt.Sprintf("cloudauditlogssource-%s", string(s.UID)) - return c.PubSubBase.DeleteOldPubSubTopic(ctx, s, oldTopicName) -} diff --git a/pkg/reconciler/events/auditlogs/auditlogs_test.go b/pkg/reconciler/events/auditlogs/auditlogs_test.go index 9aece6de3e..8bbb6c6b0c 100644 --- a/pkg/reconciler/events/auditlogs/auditlogs_test.go +++ b/pkg/reconciler/events/auditlogs/auditlogs_test.go @@ -1414,8 +1414,6 @@ func TestAllCases(t *testing.T) { ReceiveAdapterName: receiveAdapterName, ReceiveAdapterType: string(converters.CloudAuditLogs), ConfigWatcher: cmw, - // TODO remove after 0.16 cut. - PubsubClientProvider: gpubsub.TestClientCreator(nil), }), Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager, NewGCPAuthTestStore(t, nil)), auditLogsSourceLister: listers.GetCloudAuditLogsSourceLister(), diff --git a/pkg/reconciler/events/auditlogs/controller.go b/pkg/reconciler/events/auditlogs/controller.go index 298d683932..3de9790488 100644 --- a/pkg/reconciler/events/auditlogs/controller.go +++ b/pkg/reconciler/events/auditlogs/controller.go @@ -82,7 +82,6 @@ func newController( ReceiveAdapterName: receiveAdapterName, ReceiveAdapterType: string(converters.CloudAuditLogs), ConfigWatcher: cmw, - PubsubClientProvider: gpubsub.NewClient, }), Identity: identity.NewIdentity(ctx, ipm, gcpas), auditLogsSourceLister: cloudauditlogssourceInformer.Lister(), diff --git a/pkg/reconciler/events/scheduler/controller.go b/pkg/reconciler/events/scheduler/controller.go index 63ad419fa6..b25998ac2c 100644 --- a/pkg/reconciler/events/scheduler/controller.go +++ b/pkg/reconciler/events/scheduler/controller.go @@ -37,7 +37,6 @@ import ( pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1beta1/pullsubscription" topicinformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1beta1/topic" cloudschedulersourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1beta1/cloudschedulersource" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" gscheduler "github.com/google/knative-gcp/pkg/gclient/scheduler" ) @@ -76,11 +75,10 @@ func newController( c := &Reconciler{ PubSubBase: intevents.NewPubSubBase(ctx, &intevents.PubSubBaseArgs{ - ControllerAgentName: controllerAgentName, - ReceiveAdapterName: receiveAdapterName, - ReceiveAdapterType: string(converters.CloudScheduler), - ConfigWatcher: cmw, - PubsubClientProvider: gpubsub.NewClient, + ControllerAgentName: controllerAgentName, + ReceiveAdapterName: receiveAdapterName, + ReceiveAdapterType: string(converters.CloudScheduler), + ConfigWatcher: cmw, }), Identity: identity.NewIdentity(ctx, ipm, gcpas), schedulerLister: cloudschedulersourceInformer.Lister(), diff --git a/pkg/reconciler/events/scheduler/scheduler.go b/pkg/reconciler/events/scheduler/scheduler.go index 380e4ac0e4..cff9b40cbb 100644 --- a/pkg/reconciler/events/scheduler/scheduler.go +++ b/pkg/reconciler/events/scheduler/scheduler.go @@ -18,7 +18,6 @@ package scheduler import ( "context" - "fmt" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "knative.dev/pkg/logging" @@ -92,11 +91,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, scheduler *v1beta1.Cloud } scheduler.Status.MarkJobReady(jobName) - // TODO remove after 0.16 cut. - if err := r.deleteOldPubSubTopic(ctx, scheduler); err != nil { - return reconciler.NewEvent(corev1.EventTypeWarning, "DeletePubSubTopicFailed", "Failed to delete PubSub topic: %s", err.Error()) - } - return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `CloudSchedulerSource reconciled: "%s/%s"`, scheduler.Namespace, scheduler.Name) } @@ -208,9 +202,3 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, scheduler *v1beta1.CloudS return nil } - -// TODO remove after 0.16 cut. -func (c *Reconciler) deleteOldPubSubTopic(ctx context.Context, scheduler *v1beta1.CloudSchedulerSource) error { - oldTopicName := fmt.Sprintf("scheduler-%s", string(scheduler.UID)) - return c.PubSubBase.DeleteOldPubSubTopic(ctx, scheduler, oldTopicName) -} diff --git a/pkg/reconciler/events/scheduler/scheduler_test.go b/pkg/reconciler/events/scheduler/scheduler_test.go index f80e618c6e..c9873ce3fd 100644 --- a/pkg/reconciler/events/scheduler/scheduler_test.go +++ b/pkg/reconciler/events/scheduler/scheduler_test.go @@ -45,7 +45,6 @@ import ( inteventsv1beta1 "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1beta1/cloudschedulersource" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" gscheduler "github.com/google/knative-gcp/pkg/gclient/scheduler/testing" "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" "github.com/google/knative-gcp/pkg/reconciler/identity" @@ -1261,8 +1260,6 @@ func TestAllCases(t *testing.T) { ReceiveAdapterName: receiveAdapterName, ReceiveAdapterType: string(converters.CloudScheduler), ConfigWatcher: cmw, - // TODO remove after 0.16 cut. - PubsubClientProvider: gpubsub.TestClientCreator(nil), }), Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager, NewGCPAuthTestStore(t, nil)), schedulerLister: listers.GetCloudSchedulerSourceLister(), diff --git a/pkg/reconciler/events/storage/controller.go b/pkg/reconciler/events/storage/controller.go index 4f97349d0a..1744a08239 100644 --- a/pkg/reconciler/events/storage/controller.go +++ b/pkg/reconciler/events/storage/controller.go @@ -33,7 +33,6 @@ import ( pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1beta1/pullsubscription" topicinformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1beta1/topic" cloudstoragesourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1beta1/cloudstoragesource" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" gstorage "github.com/google/knative-gcp/pkg/gclient/storage" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" @@ -76,11 +75,10 @@ func newController( r := &Reconciler{ PubSubBase: intevents.NewPubSubBase(ctx, &intevents.PubSubBaseArgs{ - ControllerAgentName: controllerAgentName, - ReceiveAdapterName: receiveAdapterName, - ReceiveAdapterType: string(converters.CloudStorage), - ConfigWatcher: cmw, - PubsubClientProvider: gpubsub.NewClient, + ControllerAgentName: controllerAgentName, + ReceiveAdapterName: receiveAdapterName, + ReceiveAdapterType: string(converters.CloudStorage), + ConfigWatcher: cmw, }), Identity: identity.NewIdentity(ctx, ipm, gcpas), storageLister: cloudstoragesourceInformer.Lister(), diff --git a/pkg/reconciler/events/storage/storage.go b/pkg/reconciler/events/storage/storage.go index 63b36fde77..c7189e2ccf 100644 --- a/pkg/reconciler/events/storage/storage.go +++ b/pkg/reconciler/events/storage/storage.go @@ -18,7 +18,6 @@ package storage import ( "context" - "fmt" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -106,11 +105,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, storage *v1beta1.CloudSt } storage.Status.MarkNotificationReady(notification) - // TODO remove after 0.16 cut. - if err := r.deleteOldPubSubTopic(ctx, storage); err != nil { - return reconciler.NewEvent(corev1.EventTypeWarning, "DeletePubSubTopicFailed", "Failed to delete PubSub topic: %s", err.Error()) - } - return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `CloudStorageSource reconciled: "%s/%s"`, storage.Namespace, storage.Name) } @@ -261,9 +255,3 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, storage *v1beta1.CloudSto // ok to remove finalizer. return nil } - -// TODO remove after 0.16 cut. -func (c *Reconciler) deleteOldPubSubTopic(ctx context.Context, storage *v1beta1.CloudStorageSource) error { - oldTopicName := fmt.Sprintf("storage-%s", string(storage.UID)) - return c.PubSubBase.DeleteOldPubSubTopic(ctx, storage, oldTopicName) -} diff --git a/pkg/reconciler/events/storage/storage_test.go b/pkg/reconciler/events/storage/storage_test.go index 6f18234d89..67feb5362a 100644 --- a/pkg/reconciler/events/storage/storage_test.go +++ b/pkg/reconciler/events/storage/storage_test.go @@ -47,7 +47,6 @@ import ( inteventsv1beta1 "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1beta1/cloudstoragesource" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" gstorage "github.com/google/knative-gcp/pkg/gclient/storage/testing" "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" "github.com/google/knative-gcp/pkg/reconciler/identity" @@ -1279,8 +1278,6 @@ func TestAllCases(t *testing.T) { ReceiveAdapterName: receiveAdapterName, ReceiveAdapterType: string(converters.CloudStorage), ConfigWatcher: cmw, - // TODO remove after 0.16 cut. - PubsubClientProvider: gpubsub.TestClientCreator(nil), }), Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager, NewGCPAuthTestStore(t, nil)), storageLister: listers.GetCloudStorageSourceLister(), diff --git a/pkg/reconciler/intevents/controller.go b/pkg/reconciler/intevents/controller.go index 6bb4ec03d1..df4c8c2564 100644 --- a/pkg/reconciler/intevents/controller.go +++ b/pkg/reconciler/intevents/controller.go @@ -22,17 +22,14 @@ import ( "knative.dev/pkg/configmap" pubsubClient "github.com/google/knative-gcp/pkg/client/injection/client" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" ) type PubSubBaseArgs struct { - ControllerAgentName string - ReceiveAdapterName string - ReceiveAdapterType string - ConfigWatcher configmap.Watcher - // TODO remove after 0.16 cut. - PubsubClientProvider gpubsub.CreateFn + ControllerAgentName string + ReceiveAdapterName string + ReceiveAdapterType string + ConfigWatcher configmap.Watcher } func NewPubSubBase(ctx context.Context, args *PubSubBaseArgs) *PubSubBase { @@ -41,7 +38,5 @@ func NewPubSubBase(ctx context.Context, args *PubSubBaseArgs) *PubSubBase { pubsubClient: pubsubClient.Get(ctx), receiveAdapterName: args.ReceiveAdapterName, receiveAdapterType: args.ReceiveAdapterType, - // TODO remove after 0.16 cut. - pubsubClientProvider: args.PubsubClientProvider, } } diff --git a/pkg/reconciler/intevents/reconciler.go b/pkg/reconciler/intevents/reconciler.go index ffa823b97d..45545a03df 100644 --- a/pkg/reconciler/intevents/reconciler.go +++ b/pkg/reconciler/intevents/reconciler.go @@ -24,7 +24,6 @@ import ( inteventsv1beta1 "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" clientset "github.com/google/knative-gcp/pkg/client/clientset/versioned" duck "github.com/google/knative-gcp/pkg/duck/v1beta1" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/intevents/resources" "go.uber.org/zap" @@ -57,9 +56,6 @@ type PubSubBase struct { // What type of receive adapter to use. receiveAdapterType string - - // TODO remove after 0.16 cut. - pubsubClientProvider gpubsub.CreateFn } // ReconcilePubSub reconciles Topic / PullSubscription given a PubSubSpec. @@ -110,6 +106,22 @@ func (psb *PubSubBase) reconcileTopic(ctx context.Context, pubsubable duck.PubSu } else if err != nil { logging.FromContext(ctx).Desugar().Error("Failed to get Topic", zap.Error(err)) return nil, fmt.Errorf("failed to get Topic: %w", err) + // TODO remove this else if after 0.16 cut. + } else if newTopic.Spec.Topic != t.Spec.Topic { + // We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable. + // We have to delete the oldTopic and create a new one here. + logging.FromContext(ctx).Desugar().Info("Deleting old Topic", zap.Any("topic", t)) + err = topics.Delete(t.Name, nil) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete old Topic", zap.Any("topic", t), zap.Error(err)) + return nil, fmt.Errorf("failed to update Topic: %w", err) + } + logging.FromContext(ctx).Desugar().Debug("Creating new Topic", zap.Any("topic", newTopic)) + t, err = topics.Create(newTopic) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Any("topic", newTopic), zap.Error(err)) + return nil, fmt.Errorf("failed to create Topic: %w", err) + } // Check whether the specs differ and update the Topic if so. } else if !equality.Semantic.DeepDerivative(newTopic.Spec, t.Spec) { // Don't modify the informers copy. @@ -286,33 +298,3 @@ func (psb *PubSubBase) DeletePubSub(ctx context.Context, pubsubable duck.PubSuba status.SinkURI = nil return nil } - -// TODO remove after 0.16 cut. -func (psb *PubSubBase) DeleteOldPubSubTopic(ctx context.Context, pubsubable duck.PubSubable, topic string) error { - // At this point the project ID should have been populated in the status. - // Querying Pub/Sub as the topic could have been deleted outside the cluster (e.g, through gcloud). - status := pubsubable.PubSubStatus() - client, err := psb.pubsubClientProvider(ctx, status.ProjectID) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to create Pub/Sub client", zap.Error(err)) - return err - } - defer client.Close() - - t := client.Topic(topic) - exists, err := t.Exists(ctx) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to verify Pub/Sub topic exists", zap.String("topic", topic), zap.Error(err)) - return err - } - if exists { - // Delete the topic. - if err := t.Delete(ctx); err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to delete Pub/Sub topic", zap.String("topic", topic), zap.Error(err)) - return err - } - } - return nil -} - - From a202d66c7e299d73a045caa4a0eed063d5cc7c32 Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 11:20:16 -0700 Subject: [PATCH 03/12] updates for channel --- pkg/reconciler/messaging/channel/channel.go | 62 +++++-------------- .../messaging/channel/channel_test.go | 10 ++- .../messaging/channel/controller.go | 10 ++- 3 files changed, 25 insertions(+), 57 deletions(-) diff --git a/pkg/reconciler/messaging/channel/channel.go b/pkg/reconciler/messaging/channel/channel.go index 1e51ec4325..da04ce9dd8 100644 --- a/pkg/reconciler/messaging/channel/channel.go +++ b/pkg/reconciler/messaging/channel/channel.go @@ -19,8 +19,6 @@ package channel import ( "context" "fmt" - "github.com/google/knative-gcp/pkg/utils" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -37,8 +35,6 @@ import ( channelreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/messaging/v1beta1/channel" inteventslisters "github.com/google/knative-gcp/pkg/client/listers/intevents/v1beta1" listers "github.com/google/knative-gcp/pkg/client/listers/messaging/v1beta1" - metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/messaging/channel/resources" @@ -63,9 +59,6 @@ type Reconciler struct { // listers index properties about resources channelLister listers.ChannelLister topicLister inteventslisters.TopicLister - - // TODO remove after 0.16 cut. - pubsubClientProvider gpubsub.CreateFn } // Check that our Reconciler implements Interface. @@ -105,11 +98,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *v1beta1.Channel return pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciledSubscribersStatusFailedReason, "Reconcile Subscribers Status failed with: %s", err.Error()) } - // TODO remove after 0.16 cut. - if err := r.deleteOldPubSubTopic(ctx, channel); err != nil { - return pkgreconciler.NewEvent(corev1.EventTypeWarning, "DeletePubSubTopicFailed", "Failed to delete PubSub topic: %s", err.Error()) - } - return pkgreconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `Channel reconciled: "%s/%s"`, channel.Namespace, channel.Name) } @@ -325,6 +313,23 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.Channe } else if !metav1.IsControlledBy(topic, channel) { channel.Status.MarkTopicNotOwned("Topic %q is owned by another resource.", name) return nil, fmt.Errorf("Channel: %s does not own Topic: %s", channel.Name, name) + // TODO remove this else if after 0.16 cut. + } else if t.Spec.Topic != topic.Spec.Topic { + // We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable. + // We have to delete the oldTopic and create a new one here. + logging.FromContext(ctx).Desugar().Info("Deleting old Topic", zap.Any("topic", topic)) + err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Delete(topic.Name, nil) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete old Topic", zap.Any("topic", topic), zap.Error(err)) + return nil, fmt.Errorf("failed to update Topic: %w", err) + } + logging.FromContext(ctx).Desugar().Debug("Creating new Topic", zap.Any("topic", t)) + t, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Any("topic", t), zap.Error(err)) + return nil, fmt.Errorf("failed to create Topic: %w", err) + } + return t, nil } else if !equality.Semantic.DeepDerivative(t.Spec, topic.Spec) { // Don't modify the informers copy. desired := topic.DeepCopy() @@ -403,36 +408,3 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *v1beta1.Channel) return nil } - -// TODO remove after 0.16 cut. -func (r *Reconciler) deleteOldPubSubTopic(ctx context.Context, channel *v1beta1.Channel) error { - oldTopicName := fmt.Sprintf("cre-chan-%s", string(channel.UID)) - - projectID, err := utils.ProjectID(channel.Spec.Project, metadataClient.NewDefaultMetadataClient()) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to find project id", zap.Error(err)) - return err - } - - client, err := r.pubsubClientProvider(ctx, projectID) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to create Pub/Sub client", zap.Error(err)) - return err - } - defer client.Close() - - t := client.Topic(oldTopicName) - exists, err := t.Exists(ctx) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to verify Pub/Sub topic exists", zap.String("topic", oldTopicName), zap.Error(err)) - return err - } - if exists { - // Delete the topic. - if err := t.Delete(ctx); err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to delete Pub/Sub topic", zap.String("topic", oldTopicName), zap.Error(err)) - return err - } - } - return nil -} diff --git a/pkg/reconciler/messaging/channel/channel_test.go b/pkg/reconciler/messaging/channel/channel_test.go index de08079802..a424affa1e 100644 --- a/pkg/reconciler/messaging/channel/channel_test.go +++ b/pkg/reconciler/messaging/channel/channel_test.go @@ -42,7 +42,6 @@ import ( "github.com/google/knative-gcp/pkg/apis/messaging/v1beta1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/messaging/v1beta1/channel" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/messaging/channel/resources" @@ -500,11 +499,10 @@ func TestAllCases(t *testing.T) { defer logtesting.ClearAll() table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, _ map[string]interface{}) controller.Reconciler { r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager, NewGCPAuthTestStore(t, nil)), - channelLister: listers.GetChannelLister(), - topicLister: listers.GetTopicLister(), - pubsubClientProvider: gpubsub.TestClientCreator(nil), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager, NewGCPAuthTestStore(t, nil)), + channelLister: listers.GetChannelLister(), + topicLister: listers.GetTopicLister(), } return channel.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetChannelLister(), r.Recorder, r) })) diff --git a/pkg/reconciler/messaging/channel/controller.go b/pkg/reconciler/messaging/channel/controller.go index 335b692e0e..d12ad01511 100644 --- a/pkg/reconciler/messaging/channel/controller.go +++ b/pkg/reconciler/messaging/channel/controller.go @@ -31,7 +31,6 @@ import ( topicinformer "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1beta1/topic" channelinformer "github.com/google/knative-gcp/pkg/client/injection/informers/messaging/v1beta1/channel" channelreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/messaging/v1beta1/channel" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" @@ -69,11 +68,10 @@ func newController( serviceAccountInformer := serviceaccountinformers.Get(ctx) r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - Identity: identity.NewIdentity(ctx, ipm, gcpas), - channelLister: channelInformer.Lister(), - topicLister: topicInformer.Lister(), - pubsubClientProvider: gpubsub.NewClient, + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + Identity: identity.NewIdentity(ctx, ipm, gcpas), + channelLister: channelInformer.Lister(), + topicLister: topicInformer.Lister(), } impl := channelreconciler.NewImpl(ctx, r) From 8299d146a2df7eff30817eec4c445743f0d8d110 Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 11:24:03 -0700 Subject: [PATCH 04/12] nits --- pkg/reconciler/events/scheduler/scheduler.go | 1 - .../intevents/pullsubscription/keda/pullsubscription.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/reconciler/events/scheduler/scheduler.go b/pkg/reconciler/events/scheduler/scheduler.go index cff9b40cbb..91373490e8 100644 --- a/pkg/reconciler/events/scheduler/scheduler.go +++ b/pkg/reconciler/events/scheduler/scheduler.go @@ -90,7 +90,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, scheduler *v1beta1.Cloud return reconciler.NewEvent(corev1.EventTypeWarning, reconciledFailedReason, "Reconcile Job failed with: %s", err.Error()) } scheduler.Status.MarkJobReady(jobName) - return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `CloudSchedulerSource reconciled: "%s/%s"`, scheduler.Namespace, scheduler.Name) } diff --git a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go index 243a1a8e13..e23d48734a 100644 --- a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go +++ b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go @@ -19,7 +19,6 @@ package keda import ( "context" "fmt" - "k8s.io/client-go/dynamic" "strings" "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" @@ -34,6 +33,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" eventingduck "knative.dev/eventing/pkg/duck" "knative.dev/pkg/logging" From 70a7c9a21de89aefe0df5c33b2b5a7751aab25c3 Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 11:38:50 -0700 Subject: [PATCH 05/12] deleting and creating PS again. That will take care of removing the old RA and the old Pub/Sub subscription --- .../intevents/pullsubscription/reconciler.go | 60 ------------------- pkg/reconciler/intevents/reconciler.go | 18 +++++- 2 files changed, 17 insertions(+), 61 deletions(-) diff --git a/pkg/reconciler/intevents/pullsubscription/reconciler.go b/pkg/reconciler/intevents/pullsubscription/reconciler.go index 7878bf89e0..44ba45d6b2 100644 --- a/pkg/reconciler/intevents/pullsubscription/reconciler.go +++ b/pkg/reconciler/intevents/pullsubscription/reconciler.go @@ -21,7 +21,6 @@ import ( "fmt" "time" - "encoding/json" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -174,13 +173,6 @@ func (r *Base) reconcileSubscription(ctx context.Context, ps *v1beta1.PullSubscr } defer client.Close() - // TODO remove after 0.16 cut. - err = r.deleteOldSubscription(ctx, ps, client) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to delete Pub/Sub subscription", zap.Error(err)) - return "", err - } - // Generate the subscription name subID := resources.GenerateSubscriptionName(ps) @@ -329,13 +321,6 @@ func (r *Base) reconcileDataPlaneResources(ctx context.Context, ps *v1beta1.Pull TracingConfig: tracingConfig, }) - // TODO remove after the 0.16 cut. - err = r.deleteOldReceiveAdapter(ctx, ps) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Error deleting old receive adapter", zap.Error(err)) - return err - } - return f(ctx, desired, ps) } @@ -454,48 +439,3 @@ func (r *Base) FinalizeKind(ctx context.Context, ps *v1beta1.PullSubscription) r } return nil } - -// TODO remove after 0.16 is cut. -func (r *Base) deleteOldReceiveAdapter(ctx context.Context, ps *v1beta1.PullSubscription) reconciler.Event { - // This is the old receive adapter name. - name := fmt.Sprintf("cre-pull-%s", string(ps.UID)) - ra, err := r.KubeClientSet.AppsV1().Deployments(ps.Namespace).Get(name, metav1.GetOptions{}) - if err != nil { - if apierrors.IsNotFound(err) { - logging.FromContext(ctx).Desugar().Debug("Receive Adapter already deleted", zap.Error(err)) - return nil - } - logging.FromContext(ctx).Desugar().Error("Failed to get old receive adapter", zap.Error(err)) - return fmt.Errorf("failed to get receive adapter %q for ps %q", name, ps.Name) - } else if !metav1.IsControlledBy(ra, ps) { - adapter, _ := json.Marshal(ra) - logging.FromContext(ctx).Desugar().Error("PullSubscription does not own receive adapter", zap.Any("adapter", adapter)) - return fmt.Errorf("PullSubscription %q does not own receive adapter: %q", ps.Name, name) - } - - err = r.KubeClientSet.AppsV1().Deployments(ps.Namespace).Delete(name, &metav1.DeleteOptions{}) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to delete receive adapter", zap.Error(err)) - return err - } - return nil -} - -// TODO remove after 0.16 is cut. -func (r *Base) deleteOldSubscription(ctx context.Context, ps *v1beta1.PullSubscription, client gpubsub.Client) reconciler.Event { - // This is the name of the old Pub/Sub subscription. - name := fmt.Sprintf("cre-pull-%s", string(ps.UID)) - sub := client.Subscription(name) - exists, err := sub.Exists(ctx) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to verify Pub/Sub subscription exists", zap.Error(err)) - return err - } - if exists { - if err := sub.Delete(ctx); err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to delete Pub/Sub subscription", zap.Error(err)) - return err - } - } - return nil -} diff --git a/pkg/reconciler/intevents/reconciler.go b/pkg/reconciler/intevents/reconciler.go index 45545a03df..896f0c9e84 100644 --- a/pkg/reconciler/intevents/reconciler.go +++ b/pkg/reconciler/intevents/reconciler.go @@ -114,7 +114,7 @@ func (psb *PubSubBase) reconcileTopic(ctx context.Context, pubsubable duck.PubSu err = topics.Delete(t.Name, nil) if err != nil { logging.FromContext(ctx).Desugar().Error("Failed to delete old Topic", zap.Any("topic", t), zap.Error(err)) - return nil, fmt.Errorf("failed to update Topic: %w", err) + return nil, fmt.Errorf("failed to delete Topic: %w", err) } logging.FromContext(ctx).Desugar().Debug("Creating new Topic", zap.Any("topic", newTopic)) t, err = topics.Create(newTopic) @@ -183,6 +183,22 @@ func (psb *PubSubBase) ReconcilePullSubscription(ctx context.Context, pubsubable logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", newPS), zap.Error(err)) return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, pullSubscriptionCreateFailedReason, "Creating PullSubscription failed with: %s", err.Error()) } + // TODO remove this else if after 0.16 cut. + } else if newPS.Spec.Topic != ps.Spec.Topic { + // We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable. + // We have to delete the old PS and create a new one here. + logging.FromContext(ctx).Desugar().Info("Deleting old PullSubscription", zap.Any("ps", ps)) + err = pullSubscriptions.Delete(ps.Name, nil) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete old PullSubscription", zap.Any("ps", ps), zap.Error(err)) + return nil, fmt.Errorf("failed to delete Pullsubscription: %w", err) + } + logging.FromContext(ctx).Desugar().Debug("Creating new PullSubscription", zap.Any("topic", newPS)) + ps, err = pullSubscriptions.Create(newPS) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", newPS), zap.Error(err)) + return nil, fmt.Errorf("failed to create PullSubscription: %w", err) + } // Check whether the specs differ and update the PS if so. } else if !equality.Semantic.DeepDerivative(newPS.Spec, ps.Spec) { // Don't modify the informers copy. From b44b84cb50f676d60a059d85e762181ca82cba87 Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 12:02:03 -0700 Subject: [PATCH 06/12] deleting and creating PS again. That will take care of removing the old RA and the old Pub/Sub subscription --- pkg/reconciler/intevents/reconciler.go | 2 +- pkg/reconciler/intevents/topic/topic.go | 30 --------------------- pkg/reconciler/messaging/channel/channel.go | 16 +++++++++++ 3 files changed, 17 insertions(+), 31 deletions(-) diff --git a/pkg/reconciler/intevents/reconciler.go b/pkg/reconciler/intevents/reconciler.go index 896f0c9e84..ad5ed953db 100644 --- a/pkg/reconciler/intevents/reconciler.go +++ b/pkg/reconciler/intevents/reconciler.go @@ -193,7 +193,7 @@ func (psb *PubSubBase) ReconcilePullSubscription(ctx context.Context, pubsubable logging.FromContext(ctx).Desugar().Error("Failed to delete old PullSubscription", zap.Any("ps", ps), zap.Error(err)) return nil, fmt.Errorf("failed to delete Pullsubscription: %w", err) } - logging.FromContext(ctx).Desugar().Debug("Creating new PullSubscription", zap.Any("topic", newPS)) + logging.FromContext(ctx).Desugar().Debug("Creating new PullSubscription", zap.Any("ps", newPS)) ps, err = pullSubscriptions.Create(newPS) if err != nil { logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", newPS), zap.Error(err)) diff --git a/pkg/reconciler/intevents/topic/topic.go b/pkg/reconciler/intevents/topic/topic.go index fd4fd0c259..cd943c1e37 100644 --- a/pkg/reconciler/intevents/topic/topic.go +++ b/pkg/reconciler/intevents/topic/topic.go @@ -109,10 +109,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, topic *v1beta1.Topic) re // If enablePublisher is false, then skip creating the publisher. if enablePublisher := topic.Spec.EnablePublisher; enablePublisher != nil && !*enablePublisher { - // TODO remove after 0.16 cut. - if err := r.deleteOldPublisher(ctx, topic); err != nil { - return reconciler.NewEvent(corev1.EventTypeWarning, "PublisherDeleteFailed", "Failed to delete publisher: %s", err.Error()) - } return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, topic.Namespace, topic.Name) } @@ -258,32 +254,6 @@ func (r *Reconciler) reconcilePublisher(ctx context.Context, topic *v1beta1.Topi return nil, svc } -// TODO remove after 0.16 cut. -func (r *Reconciler) deleteOldPublisher(ctx context.Context, topic *v1beta1.Topic) error { - // We haven't changed the publisher name, so this can remain as is - name := resources.GeneratePublisherName(topic) - existing, err := r.serviceLister.Services(topic.Namespace).Get(name) - if err != nil { - if apierrors.IsNotFound(err) { - logging.FromContext(ctx).Desugar().Debug("Publisher already deleted", zap.Error(err)) - return nil - } - logging.FromContext(ctx).Desugar().Error("Failed to get publisher", zap.Error(err)) - return fmt.Errorf("failed to get publisher %q for topic %q", name, topic.Name) - } else if !metav1.IsControlledBy(existing, topic) { - p, _ := json.Marshal(existing) - logging.FromContext(ctx).Desugar().Error("Topic does not own publisher service", zap.Any("publisher", p)) - return fmt.Errorf("Topic %q does not own publisher service: %q", topic.Name, name) - } - - err = r.ServingClientSet.ServingV1().Services(topic.Namespace).Delete(name, &metav1.DeleteOptions{}) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to delete publisher", zap.Error(err)) - return err - } - return nil -} - func (r *Reconciler) UpdateFromTracingConfigMap(cfg *corev1.ConfigMap) { if cfg == nil { r.Logger.Error("Tracing ConfigMap is nil") diff --git a/pkg/reconciler/messaging/channel/channel.go b/pkg/reconciler/messaging/channel/channel.go index da04ce9dd8..89469b4bb8 100644 --- a/pkg/reconciler/messaging/channel/channel.go +++ b/pkg/reconciler/messaging/channel/channel.go @@ -211,6 +211,22 @@ func (r *Reconciler) syncSubscribers(ctx context.Context, channel *v1beta1.Chann return err } r.Recorder.Eventf(channel, corev1.EventTypeNormal, "SubscriberCreated", "Created Subscriber %q", ps.Name) + // TODO remove this else if after 0.16 cut. + } else if ps.Spec.Topic != existingPs.Spec.Topic { + // We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable. + // We have to delete the old PS and create a new one here. + logging.FromContext(ctx).Desugar().Info("Deleting old PullSubscription", zap.Any("ps", existingPs)) + err := r.RunClientSet.InternalV1beta1().PullSubscriptions(channel.Namespace).Delete(existingPs.Name, nil) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete old PullSubscription", zap.Any("ps", existingPs), zap.Error(err)) + return fmt.Errorf("failed to delete Pullsubscription: %w", err) + } + logging.FromContext(ctx).Desugar().Debug("Creating new PullSubscription", zap.Any("ps", ps)) + ps, err = r.RunClientSet.InternalV1beta1().PullSubscriptions(channel.Namespace).Create(ps) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", ps), zap.Error(err)) + return fmt.Errorf("failed to create PullSubscription: %w", err) + } } else if !equality.Semantic.DeepEqual(ps.Spec, existingPs.Spec) { // Don't modify the informers copy. desired := existingPs.DeepCopy() From 167157b98accee25281f57257fb3ce03900ecd8c Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 12:16:38 -0700 Subject: [PATCH 07/12] no need to remove ScaledObject --- .../pullsubscription/keda/pullsubscription.go | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go index e23d48734a..84b200b787 100644 --- a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go +++ b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription.go @@ -33,7 +33,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" - "k8s.io/client-go/dynamic" eventingduck "knative.dev/eventing/pkg/duck" "knative.dev/pkg/logging" @@ -134,12 +133,6 @@ func (r *Reconciler) ReconcileScaledObject(ctx context.Context, ra *appsv1.Deplo } } - // TODO remove after 0.16 cut. - if err := r.deleteOldScaledObject(ctx, src, scaledObjectResourceInterface); err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to delete old ScaledObject", zap.Error(err)) - return err - } - // TODO propagate ScaledObject status return nil } @@ -147,23 +140,3 @@ func (r *Reconciler) ReconcileScaledObject(ctx context.Context, ra *appsv1.Deplo func (r *Reconciler) FinalizeKind(ctx context.Context, ps *v1beta1.PullSubscription) reconciler.Event { return r.Base.FinalizeKind(ctx, ps) } - -// TODO remove after 0.16 cut. -func (r *Reconciler) deleteOldScaledObject(ctx context.Context, ps *v1beta1.PullSubscription, soResourceInterface dynamic.ResourceInterface) error { - oldName := fmt.Sprintf("cre-so-%s", string(ps.UID)) - _, err := soResourceInterface.Get(oldName, metav1.GetOptions{}) - if err != nil { - if apierrs.IsNotFound(err) { - logging.FromContext(ctx).Desugar().Debug("ScaledObject already deleted", zap.String("so", oldName)) - return nil - } - logging.FromContext(ctx).Desugar().Error("Failed to get ScaledObject", zap.String("so", oldName), zap.Error(err)) - return err - } - err = soResourceInterface.Delete(oldName, &metav1.DeleteOptions{}) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to delete ScaledObject", zap.String("so", oldName), zap.Error(err)) - return err - } - return nil -} From 5605de1d804a8c5138e700649b13f48c0f0d9649 Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 15:33:24 -0700 Subject: [PATCH 08/12] some review comments --- pkg/reconciler/events/auditlogs/auditlogs.go | 2 +- pkg/reconciler/messaging/channel/channel.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/reconciler/events/auditlogs/auditlogs.go b/pkg/reconciler/events/auditlogs/auditlogs.go index 83bd98d4d7..9a7ea314c9 100644 --- a/pkg/reconciler/events/auditlogs/auditlogs.go +++ b/pkg/reconciler/events/auditlogs/auditlogs.go @@ -95,7 +95,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, s *v1beta1.CloudAuditLog // TODO remove after 0.16 cut. if err := c.deleteOldSink(ctx, s); err != nil { - return reconciler.NewEvent(corev1.EventTypeWarning, "DeleteSinkFailed", "Failed to delete StackDriver sink: %s", err.Error()) + return reconciler.NewEvent(corev1.EventTypeWarning, "DeleteOldSinkFailed", "Failed to delete old StackDriver sink: %s", err.Error()) } c.Logger.Debugf("Reconciled Stackdriver sink: %+v", sink) diff --git a/pkg/reconciler/messaging/channel/channel.go b/pkg/reconciler/messaging/channel/channel.go index 89469b4bb8..982ceb37b3 100644 --- a/pkg/reconciler/messaging/channel/channel.go +++ b/pkg/reconciler/messaging/channel/channel.go @@ -299,8 +299,6 @@ func (r *Reconciler) syncSubscribersStatus(ctx context.Context, channel *v1beta1 } func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.Channel) (*inteventsv1beta1.Topic, error) { - topic, err := r.getTopic(ctx, channel) - clusterName := channel.GetAnnotations()[duckv1beta1.ClusterNameAnnotation] name := resources.GeneratePublisherName(channel) t := resources.MakeTopic(&resources.TopicArgs{ @@ -314,6 +312,7 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.Channe Annotations: resources.GetTopicAnnotations(clusterName), }) + topic, err := r.getTopic(ctx, channel) if apierrs.IsNotFound(err) { topic, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t) if err != nil { From 8970337f4b9d1574f62df17367657e113bf8cbaf Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 17:07:26 -0700 Subject: [PATCH 09/12] getting error deleting the sink. Not deleting it and things can keep working --- pkg/reconciler/events/auditlogs/auditlogs.go | 21 -------------------- 1 file changed, 21 deletions(-) diff --git a/pkg/reconciler/events/auditlogs/auditlogs.go b/pkg/reconciler/events/auditlogs/auditlogs.go index 9a7ea314c9..0160f77798 100644 --- a/pkg/reconciler/events/auditlogs/auditlogs.go +++ b/pkg/reconciler/events/auditlogs/auditlogs.go @@ -19,7 +19,6 @@ package auditlogs import ( "cloud.google.com/go/logging/logadmin" "context" - "fmt" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -93,11 +92,6 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, s *v1beta1.CloudAuditLog s.Status.StackdriverSink = sink s.Status.MarkSinkReady() - // TODO remove after 0.16 cut. - if err := c.deleteOldSink(ctx, s); err != nil { - return reconciler.NewEvent(corev1.EventTypeWarning, "DeleteOldSinkFailed", "Failed to delete old StackDriver sink: %s", err.Error()) - } - c.Logger.Debugf("Reconciled Stackdriver sink: %+v", sink) return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `CloudAuditLogsSource reconciled: "%s/%s"`, s.Namespace, s.Name) @@ -210,18 +204,3 @@ func (c *Reconciler) FinalizeKind(ctx context.Context, s *v1beta1.CloudAuditLogs s.Status.StackdriverSink = "" return nil } - -// TODO remove after 0.16 cut. -func (c *Reconciler) deleteOldSink(ctx context.Context, s *v1beta1.CloudAuditLogsSource) error { - logadminClient, err := c.logadminClientProvider(ctx, s.Status.ProjectID) - if err != nil { - logging.FromContext(ctx).Desugar().Error("Failed to create LogAdmin client", zap.Error(err)) - return err - } - oldSinkName := fmt.Sprintf("sink-%s", string(s.UID)) - if err = logadminClient.DeleteSink(ctx, oldSinkName); status.Code(err) != codes.NotFound { - logging.FromContext(ctx).Desugar().Error("Failed to delete StackDriver sink", zap.String("sinkName", oldSinkName), zap.Error(err)) - return err - } - return nil -} From 56b83e786ac6535f85f2021474a522f965db7b3e Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 18:04:08 -0700 Subject: [PATCH 10/12] updating scheduler target pubsub topic --- pkg/gclient/scheduler/client.go | 7 +++- pkg/gclient/scheduler/interfaces.go | 2 + pkg/gclient/scheduler/testing/client.go | 11 ++++++ pkg/reconciler/events/scheduler/scheduler.go | 39 ++++++++++++++++---- 4 files changed, 51 insertions(+), 8 deletions(-) diff --git a/pkg/gclient/scheduler/client.go b/pkg/gclient/scheduler/client.go index 66f79f5f73..fbf4bb2f3d 100644 --- a/pkg/gclient/scheduler/client.go +++ b/pkg/gclient/scheduler/client.go @@ -19,7 +19,7 @@ package scheduler import ( "context" - scheduler "cloud.google.com/go/scheduler/apiv1" + "cloud.google.com/go/scheduler/apiv1" "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" schedulerpb "google.golang.org/genproto/googleapis/cloud/scheduler/v1" @@ -57,6 +57,11 @@ func (c *schedulerClient) CreateJob(ctx context.Context, req *schedulerpb.Create return c.client.CreateJob(ctx, req, opts...) } +// CreateJob implements scheduler.CloudSchedulerClient.UpdateJobRequest +func (c *schedulerClient) UpdateJob(ctx context.Context, req *schedulerpb.UpdateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) { + return c.client.UpdateJob(ctx, req, opts...) +} + // DeleteJob implements scheduler.CloudSchedulerClient.DeleteJob func (c *schedulerClient) DeleteJob(ctx context.Context, req *schedulerpb.DeleteJobRequest, opts ...gax.CallOption) error { return c.client.DeleteJob(ctx, req, opts...) diff --git a/pkg/gclient/scheduler/interfaces.go b/pkg/gclient/scheduler/interfaces.go index 09198f3bd0..7a06c60f13 100644 --- a/pkg/gclient/scheduler/interfaces.go +++ b/pkg/gclient/scheduler/interfaces.go @@ -30,6 +30,8 @@ type Client interface { Close() error // CreateJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.CreateJob CreateJob(ctx context.Context, req *schedulerpb.CreateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) + // UpdateJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.UpdateJob + UpdateJob(ctx context.Context, req *schedulerpb.UpdateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) // DeleteJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.DeleteJob DeleteJob(ctx context.Context, req *schedulerpb.DeleteJobRequest, opts ...gax.CallOption) error // GetJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.GetJob diff --git a/pkg/gclient/scheduler/testing/client.go b/pkg/gclient/scheduler/testing/client.go index 04c9e45031..0e4d2f387b 100644 --- a/pkg/gclient/scheduler/testing/client.go +++ b/pkg/gclient/scheduler/testing/client.go @@ -51,6 +51,7 @@ type TestClientData struct { CreateClientErr error CreateJobErr error DeleteJobErr error + UpdateJobErr error GetJobErr error CloseErr error } @@ -83,6 +84,16 @@ func (c *testClient) DeleteJob(ctx context.Context, req *schedulerpb.DeleteJobRe return c.data.DeleteJobErr } +// CreateJob implements client.UpdateJob +func (c *testClient) UpdateJob(ctx context.Context, req *schedulerpb.UpdateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) { + if c.data.UpdateJobErr != nil { + return nil, c.data.UpdateJobErr + } + return &schedulerpb.Job{ + Name: req.Job.Name, + }, nil +} + // GetJob implements client.GetJob func (c *testClient) GetJob(ctx context.Context, req *schedulerpb.GetJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) { if c.data.GetJobErr != nil { diff --git a/pkg/reconciler/events/scheduler/scheduler.go b/pkg/reconciler/events/scheduler/scheduler.go index 91373490e8..4215022ed1 100644 --- a/pkg/reconciler/events/scheduler/scheduler.go +++ b/pkg/reconciler/events/scheduler/scheduler.go @@ -19,13 +19,12 @@ package scheduler import ( "context" "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - "knative.dev/pkg/logging" - "knative.dev/pkg/reconciler" - schedulerpb "google.golang.org/genproto/googleapis/cloud/scheduler/v1" "google.golang.org/grpc/codes" gstatus "google.golang.org/grpc/status" + corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/logging" + "knative.dev/pkg/reconciler" "github.com/google/knative-gcp/pkg/apis/events/v1beta1" cloudschedulersourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1beta1/cloudschedulersource" @@ -111,8 +110,10 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS } defer client.Close() + pubsubTargetName := resources.GeneratePubSubTargetTopic(scheduler, topic) + // Check if the job exists. - _, err = client.GetJob(ctx, &schedulerpb.GetJobRequest{Name: jobName}) + job, err := client.GetJob(ctx, &schedulerpb.GetJobRequest{Name: jobName}) if err != nil { if st, ok := gstatus.FromError(err); !ok { logging.FromContext(ctx).Desugar().Error("Failed from CloudSchedulerSource client while retrieving CloudSchedulerSource job", zap.String("jobName", jobName), zap.Error(err)) @@ -120,7 +121,7 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS } else if st.Code() == codes.NotFound { // Create the job as it does not exist. For creation, we need a parent, extract it from the jobName. parent := resources.ExtractParentName(jobName) - // Add our jobName, and schedulerName as customAttributes. + // Add schedulerName as customAttribute. customAttributes := map[string]string{ v1beta1.CloudSchedulerSourceJobName: jobName, } @@ -130,7 +131,7 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS Name: jobName, Target: &schedulerpb.Job_PubsubTarget{ PubsubTarget: &schedulerpb.PubsubTarget{ - TopicName: resources.GeneratePubSubTargetTopic(scheduler, topic), + TopicName: pubsubTargetName, Data: []byte(scheduler.Spec.Data), Attributes: customAttributes, }, @@ -147,6 +148,30 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS return err } } + // TODO remove after 0.16 cut. + actualTarget := job.GetPubsubTarget() + if actualTarget != nil && actualTarget.TopicName != pubsubTargetName { + // This means that it is using a topic with an old name. We will update the target. + _, err = client.UpdateJob(ctx, &schedulerpb.UpdateJobRequest{ + Job: &schedulerpb.Job{ + Name: job.Name, + Target: &schedulerpb.Job_PubsubTarget{ + PubsubTarget: &schedulerpb.PubsubTarget{ + TopicName: pubsubTargetName, + Data: actualTarget.Data, + Attributes: actualTarget.Attributes, + }, + }, + // Needed to add these two here otherwise I was getting an update error. + Schedule: job.Schedule, + TimeZone: job.TimeZone, + }, + }) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to update old CloudSchedulerSource job", zap.String("jobName", jobName), zap.Error(err)) + return err + } + } return nil } From f70dfda30b6140d373fd5169890156764e1a3092 Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 18:17:55 -0700 Subject: [PATCH 11/12] fixing storage --- pkg/reconciler/events/storage/storage.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/reconciler/events/storage/storage.go b/pkg/reconciler/events/storage/storage.go index c7189e2ccf..37658f1819 100644 --- a/pkg/reconciler/events/storage/storage.go +++ b/pkg/reconciler/events/storage/storage.go @@ -146,7 +146,19 @@ func (r *Reconciler) reconcileNotification(ctx context.Context, storage *v1beta1 // If the notification does exist, then return its ID. if existing, ok := notifications[storage.Status.NotificationID]; ok { - return existing.ID, nil + // TODO remove after the 0.16 cut. + // If the notification exists, need to check whether it is using the updated topic name. + // If not, then we delete it and create it again. + if existing.TopicID != storage.Status.TopicID { + err := bucket.DeleteNotification(ctx, storage.Status.NotificationID) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Failed to delete old CloudStorageSource notification", zap.Error(err)) + return "", err + } + // We let the creation to happen after this enclosing if, thus we do not return here and need this other else. + } else { + return existing.ID, nil + } } // If the notification does not exist, then create it. From 84556735c80322b388e918b24bd3d4109a6d8e41 Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 6 Jul 2020 21:04:12 -0700 Subject: [PATCH 12/12] nits --- pkg/gclient/scheduler/client.go | 2 +- pkg/gclient/scheduler/testing/client.go | 2 +- pkg/reconciler/events/auditlogs/auditlogs.go | 4 ++-- pkg/reconciler/events/scheduler/scheduler.go | 2 +- pkg/reconciler/events/storage/storage.go | 1 + pkg/reconciler/messaging/channel/channel.go | 1 + 6 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/gclient/scheduler/client.go b/pkg/gclient/scheduler/client.go index fbf4bb2f3d..408af57e29 100644 --- a/pkg/gclient/scheduler/client.go +++ b/pkg/gclient/scheduler/client.go @@ -57,7 +57,7 @@ func (c *schedulerClient) CreateJob(ctx context.Context, req *schedulerpb.Create return c.client.CreateJob(ctx, req, opts...) } -// CreateJob implements scheduler.CloudSchedulerClient.UpdateJobRequest +// UpdateJob implements scheduler.CloudSchedulerClient.UpdateJobRequest func (c *schedulerClient) UpdateJob(ctx context.Context, req *schedulerpb.UpdateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) { return c.client.UpdateJob(ctx, req, opts...) } diff --git a/pkg/gclient/scheduler/testing/client.go b/pkg/gclient/scheduler/testing/client.go index 0e4d2f387b..c6728b16e4 100644 --- a/pkg/gclient/scheduler/testing/client.go +++ b/pkg/gclient/scheduler/testing/client.go @@ -84,7 +84,7 @@ func (c *testClient) DeleteJob(ctx context.Context, req *schedulerpb.DeleteJobRe return c.data.DeleteJobErr } -// CreateJob implements client.UpdateJob +// UpdateJob implements client.UpdateJob func (c *testClient) UpdateJob(ctx context.Context, req *schedulerpb.UpdateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) { if c.data.UpdateJobErr != nil { return nil, c.data.UpdateJobErr diff --git a/pkg/reconciler/events/auditlogs/auditlogs.go b/pkg/reconciler/events/auditlogs/auditlogs.go index 0160f77798..31564252f4 100644 --- a/pkg/reconciler/events/auditlogs/auditlogs.go +++ b/pkg/reconciler/events/auditlogs/auditlogs.go @@ -17,8 +17,9 @@ limitations under the License. package auditlogs import ( - "cloud.google.com/go/logging/logadmin" "context" + + "cloud.google.com/go/logging/logadmin" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -91,7 +92,6 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, s *v1beta1.CloudAuditLog } s.Status.StackdriverSink = sink s.Status.MarkSinkReady() - c.Logger.Debugf("Reconciled Stackdriver sink: %+v", sink) return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `CloudAuditLogsSource reconciled: "%s/%s"`, s.Namespace, s.Name) diff --git a/pkg/reconciler/events/scheduler/scheduler.go b/pkg/reconciler/events/scheduler/scheduler.go index 4215022ed1..c2887c93ec 100644 --- a/pkg/reconciler/events/scheduler/scheduler.go +++ b/pkg/reconciler/events/scheduler/scheduler.go @@ -121,7 +121,7 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS } else if st.Code() == codes.NotFound { // Create the job as it does not exist. For creation, we need a parent, extract it from the jobName. parent := resources.ExtractParentName(jobName) - // Add schedulerName as customAttribute. + // Add jobName as customAttribute. customAttributes := map[string]string{ v1beta1.CloudSchedulerSourceJobName: jobName, } diff --git a/pkg/reconciler/events/storage/storage.go b/pkg/reconciler/events/storage/storage.go index 37658f1819..d82c81a002 100644 --- a/pkg/reconciler/events/storage/storage.go +++ b/pkg/reconciler/events/storage/storage.go @@ -18,6 +18,7 @@ package storage import ( "context" + "go.uber.org/zap" "google.golang.org/grpc/codes" diff --git a/pkg/reconciler/messaging/channel/channel.go b/pkg/reconciler/messaging/channel/channel.go index 982ceb37b3..132e9fe8bb 100644 --- a/pkg/reconciler/messaging/channel/channel.go +++ b/pkg/reconciler/messaging/channel/channel.go @@ -19,6 +19,7 @@ package channel import ( "context" "fmt" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality"