Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Removing/Updating old publisher, ra, and pubsub resources #1380

Merged
merged 12 commits into from
Jul 7, 2020
25 changes: 23 additions & 2 deletions pkg/reconciler/events/auditlogs/auditlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.
package auditlogs

import (
"context"

"cloud.google.com/go/logging/logadmin"
"context"
"fmt"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -92,6 +92,12 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, s *v1beta1.CloudAuditLog
}
s.Status.StackdriverSink = sink
s.Status.MarkSinkReady()

// TODO remove after 0.16 cut.
if err := c.deleteOldSink(ctx, s); err != nil {
return reconciler.NewEvent(corev1.EventTypeWarning, "DeleteSinkFailed", "Failed to delete StackDriver sink: %s", err.Error())
nachocano marked this conversation as resolved.
Show resolved Hide resolved
}

c.Logger.Debugf("Reconciled Stackdriver sink: %+v", sink)

return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `CloudAuditLogsSource reconciled: "%s/%s"`, s.Namespace, s.Name)
Expand Down Expand Up @@ -204,3 +210,18 @@ func (c *Reconciler) FinalizeKind(ctx context.Context, s *v1beta1.CloudAuditLogs
s.Status.StackdriverSink = ""
return nil
}

// TODO remove after 0.16 cut.
func (c *Reconciler) deleteOldSink(ctx context.Context, s *v1beta1.CloudAuditLogsSource) error {
logadminClient, err := c.logadminClientProvider(ctx, s.Status.ProjectID)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create LogAdmin client", zap.Error(err))
return err
}
oldSinkName := fmt.Sprintf("sink-%s", string(s.UID))
if err = logadminClient.DeleteSink(ctx, oldSinkName); status.Code(err) != codes.NotFound {
logging.FromContext(ctx).Desugar().Error("Failed to delete StackDriver sink", zap.String("sinkName", oldSinkName), zap.Error(err))
nachocano marked this conversation as resolved.
Show resolved Hide resolved
return err
}
return nil
}
1 change: 0 additions & 1 deletion pkg/reconciler/events/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package scheduler

import (
"context"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"
Expand Down
1 change: 0 additions & 1 deletion pkg/reconciler/events/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package storage

import (
"context"

"go.uber.org/zap"

"google.golang.org/grpc/codes"
Expand Down
34 changes: 33 additions & 1 deletion pkg/reconciler/intevents/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
Expand Down Expand Up @@ -106,6 +106,22 @@ func (psb *PubSubBase) reconcileTopic(ctx context.Context, pubsubable duck.PubSu
} else if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to get Topic", zap.Error(err))
return nil, fmt.Errorf("failed to get Topic: %w", err)
// TODO remove this else if after 0.16 cut.
} else if newTopic.Spec.Topic != t.Spec.Topic {
// We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable.
// We have to delete the oldTopic and create a new one here.
logging.FromContext(ctx).Desugar().Info("Deleting old Topic", zap.Any("topic", t))
err = topics.Delete(t.Name, nil)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to delete old Topic", zap.Any("topic", t), zap.Error(err))
return nil, fmt.Errorf("failed to delete Topic: %w", err)
}
logging.FromContext(ctx).Desugar().Debug("Creating new Topic", zap.Any("topic", newTopic))
t, err = topics.Create(newTopic)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Any("topic", newTopic), zap.Error(err))
return nil, fmt.Errorf("failed to create Topic: %w", err)
}
// Check whether the specs differ and update the Topic if so.
} else if !equality.Semantic.DeepDerivative(newTopic.Spec, t.Spec) {
// Don't modify the informers copy.
Expand Down Expand Up @@ -167,6 +183,22 @@ func (psb *PubSubBase) ReconcilePullSubscription(ctx context.Context, pubsubable
logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", newPS), zap.Error(err))
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, pullSubscriptionCreateFailedReason, "Creating PullSubscription failed with: %s", err.Error())
}
// TODO remove this else if after 0.16 cut.
} else if newPS.Spec.Topic != ps.Spec.Topic {
// We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable.
// We have to delete the old PS and create a new one here.
logging.FromContext(ctx).Desugar().Info("Deleting old PullSubscription", zap.Any("ps", ps))
err = pullSubscriptions.Delete(ps.Name, nil)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to delete old PullSubscription", zap.Any("ps", ps), zap.Error(err))
return nil, fmt.Errorf("failed to delete Pullsubscription: %w", err)
}
logging.FromContext(ctx).Desugar().Debug("Creating new PullSubscription", zap.Any("ps", newPS))
ps, err = pullSubscriptions.Create(newPS)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", newPS), zap.Error(err))
return nil, fmt.Errorf("failed to create PullSubscription: %w", err)
}
// Check whether the specs differ and update the PS if so.
} else if !equality.Semantic.DeepDerivative(newPS.Spec, ps.Spec) {
// Don't modify the informers copy.
Expand Down
1 change: 0 additions & 1 deletion pkg/reconciler/intevents/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, topic *v1beta1.Topic) re

// If enablePublisher is false, then skip creating the publisher.
if enablePublisher := topic.Spec.EnablePublisher; enablePublisher != nil && !*enablePublisher {
// TODO delete previous publishers before the 0.16 cut: https://github.com/google/knative-gcp/issues/1217
return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, topic.Namespace, topic.Name)
}

Expand Down
98 changes: 72 additions & 26 deletions pkg/reconciler/messaging/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ package channel
import (
"context"
"fmt"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -213,6 +211,22 @@ func (r *Reconciler) syncSubscribers(ctx context.Context, channel *v1beta1.Chann
return err
}
r.Recorder.Eventf(channel, corev1.EventTypeNormal, "SubscriberCreated", "Created Subscriber %q", ps.Name)
// TODO remove this else if after 0.16 cut.
} else if ps.Spec.Topic != existingPs.Spec.Topic {
// We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable.
// We have to delete the old PS and create a new one here.
logging.FromContext(ctx).Desugar().Info("Deleting old PullSubscription", zap.Any("ps", existingPs))
err := r.RunClientSet.InternalV1beta1().PullSubscriptions(channel.Namespace).Delete(existingPs.Name, nil)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to delete old PullSubscription", zap.Any("ps", existingPs), zap.Error(err))
return fmt.Errorf("failed to delete Pullsubscription: %w", err)
}
logging.FromContext(ctx).Desugar().Debug("Creating new PullSubscription", zap.Any("ps", ps))
ps, err = r.RunClientSet.InternalV1beta1().PullSubscriptions(channel.Namespace).Create(ps)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", ps), zap.Error(err))
return fmt.Errorf("failed to create PullSubscription: %w", err)
}
} else if !equality.Semantic.DeepEqual(ps.Spec, existingPs.Spec) {
// Don't modify the informers copy.
desired := existingPs.DeepCopy()
Expand Down Expand Up @@ -286,22 +300,12 @@ func (r *Reconciler) syncSubscribersStatus(ctx context.Context, channel *v1beta1

func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.Channel) (*inteventsv1beta1.Topic, error) {
topic, err := r.getTopic(ctx, channel)
if err != nil && !apierrors.IsNotFound(err) {
logging.FromContext(ctx).Desugar().Error("Unable to get a Topic", zap.Error(err))
return nil, err
}
if topic != nil {
if topic.Status.Address != nil {
channel.Status.SetAddress(topic.Status.Address.URL)
} else {
channel.Status.SetAddress(nil)
}
return topic, nil
}

nachocano marked this conversation as resolved.
Show resolved Hide resolved
clusterName := channel.GetAnnotations()[duckv1beta1.ClusterNameAnnotation]
name := resources.GeneratePublisherName(channel)
t := resources.MakeTopic(&resources.TopicArgs{
Owner: channel,
Name: resources.GeneratePublisherName(channel),
Name: name,
Project: channel.Spec.Project,
ServiceAccountName: channel.Spec.ServiceAccountName,
Secret: channel.Spec.Secret,
Expand All @@ -310,14 +314,60 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.Channe
Annotations: resources.GetTopicAnnotations(clusterName),
})

topic, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Error(err))
r.Recorder.Eventf(channel, corev1.EventTypeWarning, "TopicCreateFailed", "Failed to created Topic %q: %s", topic.Name, err.Error())
return nil, err
if apierrs.IsNotFound(err) {
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
topic, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Error(err))
r.Recorder.Eventf(channel, corev1.EventTypeWarning, "TopicCreateFailed", "Failed to created Topic %q: %s", topic.Name, err.Error())
return nil, err
}
r.Recorder.Eventf(channel, corev1.EventTypeNormal, "TopicCreated", "Created Topic %q", topic.Name)
return topic, nil
} else if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to get Topic", zap.Error(err))
return nil, fmt.Errorf("failed to get Topic: %w", err)
} else if !metav1.IsControlledBy(topic, channel) {
channel.Status.MarkTopicNotOwned("Topic %q is owned by another resource.", name)
return nil, fmt.Errorf("Channel: %s does not own Topic: %s", channel.Name, name)
// TODO remove this else if after 0.16 cut.
} else if t.Spec.Topic != topic.Spec.Topic {
// We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable.
// We have to delete the oldTopic and create a new one here.
logging.FromContext(ctx).Desugar().Info("Deleting old Topic", zap.Any("topic", topic))
err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Delete(topic.Name, nil)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to delete old Topic", zap.Any("topic", topic), zap.Error(err))
return nil, fmt.Errorf("failed to update Topic: %w", err)
}
logging.FromContext(ctx).Desugar().Debug("Creating new Topic", zap.Any("topic", t))
t, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Any("topic", t), zap.Error(err))
return nil, fmt.Errorf("failed to create Topic: %w", err)
}
return t, nil
} else if !equality.Semantic.DeepDerivative(t.Spec, topic.Spec) {
// Don't modify the informers copy.
desired := topic.DeepCopy()
desired.Spec = t.Spec
logging.FromContext(ctx).Desugar().Debug("Updating Topic", zap.Any("topic", desired))
t, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Update(desired)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to update Topic", zap.Any("topic", topic), zap.Error(err))
return nil, fmt.Errorf("failed to update Topic: %w", err)
}
return t, nil
}
r.Recorder.Eventf(channel, corev1.EventTypeNormal, "TopicCreated", "Created Topic %q", topic.Name)
return topic, err

if topic != nil {
if topic.Status.Address != nil {
channel.Status.SetAddress(topic.Status.Address.URL)
} else {
channel.Status.SetAddress(nil)
}
}

return topic, nil
}

func (r *Reconciler) getTopic(_ context.Context, channel *v1beta1.Channel) (*inteventsv1beta1.Topic, error) {
Expand All @@ -326,10 +376,6 @@ func (r *Reconciler) getTopic(_ context.Context, channel *v1beta1.Channel) (*int
if err != nil {
return nil, err
}
if !metav1.IsControlledBy(topic, channel) {
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
channel.Status.MarkTopicNotOwned("Topic %q is owned by another resource.", name)
return nil, fmt.Errorf("Channel: %s does not own Topic: %s", channel.Name, name)
}
return topic, nil
}

Expand Down