From db592fcd3b8f7f0170a255a19fa9e014e3d9b59d Mon Sep 17 00:00:00 2001 From: Jimmy Lin Date: Wed, 16 Sep 2020 11:50:38 -0400 Subject: [PATCH] Move the common AllowedRegionPolicy computation to dataresidency/default.go --- pkg/apis/configs/dataresidency/defaults.go | 21 +++++++++++++++++++++ pkg/reconciler/broker/broker.go | 9 +++++---- pkg/reconciler/intevents/topic/topic.go | 4 +--- pkg/reconciler/trigger/trigger.go | 9 +++++---- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/pkg/apis/configs/dataresidency/defaults.go b/pkg/apis/configs/dataresidency/defaults.go index 37515f09f5..b45e5d5b87 100644 --- a/pkg/apis/configs/dataresidency/defaults.go +++ b/pkg/apis/configs/dataresidency/defaults.go @@ -16,6 +16,10 @@ limitations under the License. package dataresidency +import ( + "cloud.google.com/go/pubsub" +) + // Defaults includes the default values to be populated by the Webhook. type Defaults struct { // ClusterDefaults are the data residency defaults to use for all namepaces @@ -39,6 +43,23 @@ func (d *Defaults) scoped() *ScopedDefaults { return scopedDefaults } +// AllowedPersistenceRegions gets the AllowedPersistenceRegions setting in the default. func (d *Defaults) AllowedPersistenceRegions() []string { return d.scoped().AllowedPersistenceRegions } + +// ComputeAllowedPersistenceRegions computes the final message storage policy in +// topicConfig. Return true if the topicConfig is updated. +func (d *Defaults) ComputeAllowedPersistenceRegions(topicConfig *pubsub.TopicConfig) bool { + // We can do subset of both in the future, but for now, we just overwrite the + // configuration as the relationship between region and zones are not clear to handle, + // eg. us-east1 vs us-east1-a. Important note: setting the AllowedPersistenceRegions + // to empty string slice is an error, should set it to nil for all regions. + allowedRegions := d.AllowedPersistenceRegions() + if allowedRegions == nil || len(allowedRegions) == 0 { + return false + } + + topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions + return true +} diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index 35ecc4d886..8f9f19cbca 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -149,10 +149,11 @@ func (r *Reconciler) reconcileDecouplingTopicAndSubscription(ctx context.Context // Check if topic exists, and if not, create it. topicID := resources.GenerateDecouplingTopicName(b) topicConfig := &pubsub.TopicConfig{Labels: labels} - if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil { - if allowedRegions := dataresidencyConfig.DataResidencyDefaults.AllowedPersistenceRegions(); len(allowedRegions) != 0 { - topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions - logging.FromContext(ctx).Info("Updated Topic Config for Broker", zap.Any("topicConfig", *topicConfig)) + if r.dataresidencyStore != nil { + if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil { + if dataresidencyConfig.DataResidencyDefaults.ComputeAllowedPersistenceRegions(topicConfig) { + logging.FromContext(ctx).Info("Updated Topic Config for Broker", zap.Any("topicConfig", *topicConfig)) + } } } topic, err := pubsubReconciler.ReconcileTopic(ctx, topicID, topicConfig, b, &b.Status) diff --git a/pkg/reconciler/intevents/topic/topic.go b/pkg/reconciler/intevents/topic/topic.go index fd52f73ee8..fae7150130 100644 --- a/pkg/reconciler/intevents/topic/topic.go +++ b/pkg/reconciler/intevents/topic/topic.go @@ -163,9 +163,7 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, topic *v1.Topic) error topicConfig := &pubsub.TopicConfig{} if r.dataresidencyStore != nil { if dataresidencyCfg := r.dataresidencyStore.Load(); dataresidencyCfg != nil { - // Empty region list is an error in pubsub, we will left it unset in this case which means all regions - if allowedRegions := dataresidencyCfg.DataResidencyDefaults.AllowedPersistenceRegions(); len(allowedRegions) != 0 { - topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions + if dataresidencyCfg.DataResidencyDefaults.ComputeAllowedPersistenceRegions(topicConfig) { r.Logger.Infow("Updated Topic Config", zap.Any("topicConfig", *topicConfig)) } } diff --git a/pkg/reconciler/trigger/trigger.go b/pkg/reconciler/trigger/trigger.go index 7024d66e88..78c850860c 100644 --- a/pkg/reconciler/trigger/trigger.go +++ b/pkg/reconciler/trigger/trigger.go @@ -226,10 +226,11 @@ func (r *Reconciler) reconcileRetryTopicAndSubscription(ctx context.Context, tri // Check if topic exists, and if not, create it. topicID := resources.GenerateRetryTopicName(trig) topicConfig := &pubsub.TopicConfig{Labels: labels} - if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil { - if allowedRegions := dataresidencyConfig.DataResidencyDefaults.AllowedPersistenceRegions(); len(allowedRegions) != 0 { - topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions - logging.FromContext(ctx).Info("Updated Topic Config for Trigger", zap.Any("topicConfig", *topicConfig)) + if r.dataresidencyStore != nil { + if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil { + if dataresidencyConfig.DataResidencyDefaults.ComputeAllowedPersistenceRegions(topicConfig) { + logging.FromContext(ctx).Info("Updated Topic Config for Trigger", zap.Any("topicConfig", *topicConfig)) + } } } topic, err := pubsubReconciler.ReconcileTopic(ctx, topicID, topicConfig, trig, &trig.Status)