From 12817911abc21273a2841f5c0fc87e80b32e08cb Mon Sep 17 00:00:00 2001 From: "Zhongduo Lin (Jimmy)" Date: Fri, 18 Sep 2020 22:27:44 -0400 Subject: [PATCH] Add configuration for data residency (#1681) * add configuration map for data residency * Use array instead of csv string for allowedpersistenceregions configuration * Add default to the data residency config * Add TestNewDefaultsConfigFromConfigMap to dataresidency_defaults_test.go * add store and singleton for data residency * Add data residency to topic reconciler which is shared by all sources, tested by manually created a Topic and check the created topic in GCP * Add info logging when updating topic config in sources * Add data residency support to broker and trigger, not using injection * Move the common AllowedRegionPolicy computation to dataresidency/default.go * Fix unit test failure for broker and trigger * Add more test to TestComputeAllowedPersistenceRegions * Add singleton test for data residency to work around the covery problem, rename config-data-residency.yaml to make it consistent with other files * Resolve comments from PR * Add test to broker to test data residency configuration and labels * DataResidency test added to trigger * Add empty data residency config * Add helper function for data residency configuration map creation --- cmd/controller/wire.go | 2 + cmd/controller/wire_gen.go | 4 +- config/config-data-residency.yaml | 1 + config/core/configmaps/data-residency.yaml | 56 ++++++ hack/update-codegen.sh | 1 + .../dataresidency/dataresidency_defaults.go | 70 ++++++++ .../dataresidency_defaults_test.go | 160 ++++++++++++++++++ pkg/apis/configs/dataresidency/defaults.go | 65 +++++++ pkg/apis/configs/dataresidency/doc.go | 21 +++ pkg/apis/configs/dataresidency/singleton.go | 40 +++++ .../configs/dataresidency/singleton_test.go | 44 +++++ pkg/apis/configs/dataresidency/store.go | 92 ++++++++++ pkg/apis/configs/dataresidency/store_test.go | 45 +++++ .../testdata/config-dataresidency.yaml | 54 ++++++ .../dataresidency/zz_generated.deepcopy.go | 59 +++++++ pkg/reconciler/broker/broker.go | 11 ++ pkg/reconciler/broker/broker_test.go | 67 +++++++- pkg/reconciler/broker/controller.go | 11 +- pkg/reconciler/broker/controller_test.go | 9 + pkg/reconciler/intevents/topic/controller.go | 19 ++- .../intevents/topic/controller_test.go | 4 +- pkg/reconciler/intevents/topic/topic.go | 14 +- pkg/reconciler/testing/configmap.go | 27 +++ pkg/reconciler/testing/pstest.go | 17 ++ pkg/reconciler/testing/store.go | 9 + pkg/reconciler/trigger/controller.go | 12 +- pkg/reconciler/trigger/controller_test.go | 8 + pkg/reconciler/trigger/trigger.go | 10 ++ pkg/reconciler/trigger/trigger_test.go | 81 +++++++++ 29 files changed, 988 insertions(+), 25 deletions(-) create mode 120000 config/config-data-residency.yaml create mode 100644 config/core/configmaps/data-residency.yaml create mode 100644 pkg/apis/configs/dataresidency/dataresidency_defaults.go create mode 100644 pkg/apis/configs/dataresidency/dataresidency_defaults_test.go create mode 100644 pkg/apis/configs/dataresidency/defaults.go create mode 100644 pkg/apis/configs/dataresidency/doc.go create mode 100644 pkg/apis/configs/dataresidency/singleton.go create mode 100644 pkg/apis/configs/dataresidency/singleton_test.go create mode 100644 pkg/apis/configs/dataresidency/store.go create mode 100644 pkg/apis/configs/dataresidency/store_test.go create mode 100644 pkg/apis/configs/dataresidency/testdata/config-dataresidency.yaml create mode 100644 pkg/apis/configs/dataresidency/zz_generated.deepcopy.go diff --git a/cmd/controller/wire.go b/cmd/controller/wire.go index 996dacf804..10879a54ab 100644 --- a/cmd/controller/wire.go +++ b/cmd/controller/wire.go @@ -18,6 +18,7 @@ package main import ( "context" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" "github.com/google/knative-gcp/pkg/apis/configs/gcpauth" "github.com/google/knative-gcp/pkg/reconciler/events/auditlogs" "github.com/google/knative-gcp/pkg/reconciler/events/build" @@ -39,6 +40,7 @@ func InitializeControllers(ctx context.Context) ([]injection.ControllerConstruct ClientOptions, iam.PolicyManagerSet, wire.Struct(new(gcpauth.StoreSingleton)), + wire.Struct(new(dataresidency.StoreSingleton)), auditlogs.NewConstructor, storage.NewConstructor, scheduler.NewConstructor, diff --git a/cmd/controller/wire_gen.go b/cmd/controller/wire_gen.go index aa468382c2..4b4bcad37b 100644 --- a/cmd/controller/wire_gen.go +++ b/cmd/controller/wire_gen.go @@ -8,6 +8,7 @@ package main import ( "cloud.google.com/go/iam/admin/apiv1" "context" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" "github.com/google/knative-gcp/pkg/apis/configs/gcpauth" "github.com/google/knative-gcp/pkg/reconciler/events/auditlogs" "github.com/google/knative-gcp/pkg/reconciler/events/build" @@ -46,7 +47,8 @@ func InitializeControllers(ctx context.Context) ([]injection.ControllerConstruct buildConstructor := build.NewConstructor(iamPolicyManager, storeSingleton) staticConstructor := static.NewConstructor(iamPolicyManager, storeSingleton) kedaConstructor := keda.NewConstructor(iamPolicyManager, storeSingleton) - topicConstructor := topic.NewConstructor(iamPolicyManager, storeSingleton) + dataresidencyStoreSingleton := &dataresidency.StoreSingleton{} + topicConstructor := topic.NewConstructor(iamPolicyManager, storeSingleton, dataresidencyStoreSingleton) channelConstructor := channel.NewConstructor(iamPolicyManager, storeSingleton) v2 := Controllers(constructor, storageConstructor, schedulerConstructor, pubsubConstructor, buildConstructor, staticConstructor, kedaConstructor, topicConstructor, channelConstructor) return v2, nil diff --git a/config/config-data-residency.yaml b/config/config-data-residency.yaml new file mode 120000 index 0000000000..c8e7608651 --- /dev/null +++ b/config/config-data-residency.yaml @@ -0,0 +1 @@ +core/configmaps/data-residency.yaml \ No newline at end of file diff --git a/config/core/configmaps/data-residency.yaml b/config/core/configmaps/data-residency.yaml new file mode 100644 index 0000000000..8e8baec7e9 --- /dev/null +++ b/config/core/configmaps/data-residency.yaml @@ -0,0 +1,56 @@ +# Copyright 2020 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-dataresidency + namespace: cloud-run-events + annotations: + knative.dev/example-checksum: "5e76f9d5" +data: + default-dataresidency-config: | + clusterDefaults: + messagestoragepolicy.allowedpersistenceregions: [] + _example: | + ################################ + # # + # EXAMPLE CONFIGURATION # + # # + ################################ + + # This block is not actually functional configuration, + # but serves to illustrate the available configuration + # options and document them in a way that is accessible + # to users that `kubectl edit` this config map. + # + # These sample configuration options may be copied out of + # this example block and unindented to be in the data block + # to actually change the configuration. + + # default-dataresidency-config is the configuration for determining the default + # data residency to apply to all objects that require data residency. + # This is expected to be Channels and Sources and Brokers. + # + # We only support cluster scoped now + default-dataresidency-config: | + # clusterDefaults are the defaults to apply to every namespace in the + # cluster + clusterDefaults: + # messagestoragepolicy.allowedpersistenceregions field specifies + # all the allowed regions for data residency. The default or an empty value will + # mean no data residency requirement. + messagestoragepolicy.allowedpersistenceregions: + - us-east1 + - us-west1 diff --git a/hack/update-codegen.sh b/hack/update-codegen.sh index 59159585b2..16d0c2e87a 100755 --- a/hack/update-codegen.sh +++ b/hack/update-codegen.sh @@ -77,6 +77,7 @@ ${GOPATH}/bin/deepcopy-gen \ --go-header-file ${REPO_ROOT_DIR}/hack/boilerplate/boilerplate.go.txt \ -i github.com/google/knative-gcp/pkg/apis/configs/gcpauth \ -i github.com/google/knative-gcp/pkg/apis/configs/broker \ + -i github.com/google/knative-gcp/pkg/apis/configs/dataresidency \ # TODO(yolocs): generate autoscaling v2beta2 in knative/pkg. OUTPUT_PKG="github.com/google/knative-gcp/pkg/client/injection/kube" \ diff --git a/pkg/apis/configs/dataresidency/dataresidency_defaults.go b/pkg/apis/configs/dataresidency/dataresidency_defaults.go new file mode 100644 index 0000000000..7f1e173dd9 --- /dev/null +++ b/pkg/apis/configs/dataresidency/dataresidency_defaults.go @@ -0,0 +1,70 @@ +/* +Copyright 2020 Google LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataresidency + +import ( + "encoding/json" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/yaml" +) + +const ( + // configName is the name of config map for the default data residency that + // GCP resources should use. + configName = "config-dataresidency" + + // defaulterKey is the key in the ConfigMap to get the name of the default + // DataResidency setting. + defaulterKey = "default-dataresidency-config" +) + +// ConfigMapName returns the name of the configmap to read for default data residency settings. +func ConfigMapName() string { + return configName +} + +// NewDefaultsConfigFromConfigMap creates a Defaults from the supplied configMap. +func NewDefaultsConfigFromConfigMap(config *corev1.ConfigMap) (*Defaults, error) { + return NewDefaultsConfigFromMap(config.Data) +} + +// NewDefaultsConfigFromMap creates a Defaults from the supplied Map. +func NewDefaultsConfigFromMap(data map[string]string) (*Defaults, error) { + nc := &Defaults{} + + // Parse out the data residency configuration. + value, present := data[defaulterKey] + // Data residency configuration is not required, in which case it will mean + // allow all regions, so we simply use an empty one + if !present || value == "" { + return nc, nil + } + if err := parseEntry(value, nc); err != nil { + return nil, fmt.Errorf("failed to parse the entry: %s", err) + } + return nc, nil +} + +func parseEntry(entry string, out interface{}) error { + j, err := yaml.YAMLToJSON([]byte(entry)) + if err != nil { + return fmt.Errorf("ConfigMap's value could not be converted to JSON: %s : %v", err, entry) + } + return json.Unmarshal(j, &out) +} diff --git a/pkg/apis/configs/dataresidency/dataresidency_defaults_test.go b/pkg/apis/configs/dataresidency/dataresidency_defaults_test.go new file mode 100644 index 0000000000..48f449ea5d --- /dev/null +++ b/pkg/apis/configs/dataresidency/dataresidency_defaults_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2020 Google LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataresidency + +import ( + "testing" + + "cloud.google.com/go/pubsub" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + . "knative.dev/pkg/configmap/testing" + _ "knative.dev/pkg/system/testing" +) + +func TestDefaultsConfigurationFromFile(t *testing.T) { + _, example := ConfigMapsFromTestFile(t, configName, defaulterKey) + if _, err := NewDefaultsConfigFromConfigMap(example); err != nil { + t.Errorf("NewDefaultsConfigFromConfigMap(example) = %v", err) + } +} + +func TestNewDefaultsConfigFromConfigMap(t *testing.T) { + _, example := ConfigMapsFromTestFile(t, configName, defaulterKey) + defaults, err := NewDefaultsConfigFromConfigMap(example) + if err != nil { + t.Fatalf("NewDefaultsConfigFromConfigMap(example) = %v", err) + } + + // Only cluster wide configuration is supported now, but we use the namespace + // as the test name and for future extension. + testCases := []struct { + ns string + regions []string + }{ + { + ns: "cluster-wide", + regions: []string{"us-east1", "us-west1"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.ns, func(t *testing.T) { + if diff := cmp.Diff(tc.regions, defaults.AllowedPersistenceRegions()); diff != "" { + t.Errorf("Unexpected value (-want +got): %s", diff) + } + }) + } +} + +func TestComputeAllowedPersistenceRegions(t *testing.T) { + // Only cluster wide configuration is supported now, but we use the namespace + // as the test name and for future extension. + testCases := []struct { + ns string + topicConfigRegions []string + dsRegions []string + expectedRegions []string + }{ + { + ns: "subset", + topicConfigRegions: []string{"us-east1", "us-west1"}, + dsRegions: []string{"us-west1"}, + expectedRegions: []string{"us-west1"}, + }, + { + ns: "conflict", + topicConfigRegions: []string{"us-east1"}, + dsRegions: []string{"us-west1"}, + expectedRegions: []string{"us-west1"}, + }, + { + ns: "topic-nil", + topicConfigRegions: nil, + dsRegions: []string{"us-west1"}, + expectedRegions: []string{"us-west1"}, + }, + { + ns: "topic-nil-ds-empty", + topicConfigRegions: nil, + dsRegions: []string{}, + expectedRegions: nil, + }, + { + ns: "ds-empty", + topicConfigRegions: []string{"us-east1"}, + dsRegions: []string{}, + expectedRegions: []string{"us-east1"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.ns, func(t *testing.T) { + defaults, err := NewDefaultsConfigFromMap(map[string]string{}) + if err != nil { + t.Fatalf("NewDefaultsConfigFromConfigMap(empty) = %v", err) + } + defaults.ClusterDefaults.AllowedPersistenceRegions = tc.dsRegions + topicConfig := &pubsub.TopicConfig{} + topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = tc.topicConfigRegions + defaults.ComputeAllowedPersistenceRegions(topicConfig) + if diff := cmp.Diff(tc.expectedRegions, topicConfig.MessageStoragePolicy.AllowedPersistenceRegions); diff != "" { + t.Errorf("Unexpected value (-want +got): %s", diff) + } + }) + } +} + +func TestNewDefaultsConfigFromConfigMapEmpty(t *testing.T) { + testCases := map[string]struct { + name string + config *corev1.ConfigMap + }{ + "empty data": { + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "cloud-run-events", + Name: configName, + }, + Data: map[string]string{}, + }, + }, + "missing key": { + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "cloud-run-events", + Name: configName, + }, + Data: map[string]string{ + "other-keys": "are-present", + }, + }, + }, + } + + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + _, err := NewDefaultsConfigFromConfigMap(tc.config) + if err != nil { + t.Errorf("Empty value or no key should pass") + } + }) + } +} diff --git a/pkg/apis/configs/dataresidency/defaults.go b/pkg/apis/configs/dataresidency/defaults.go new file mode 100644 index 0000000000..b45e5d5b87 --- /dev/null +++ b/pkg/apis/configs/dataresidency/defaults.go @@ -0,0 +1,65 @@ +/* +Copyright 2020 Google LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataresidency + +import ( + "cloud.google.com/go/pubsub" +) + +// Defaults includes the default values to be populated by the Webhook. +type Defaults struct { + // ClusterDefaults are the data residency defaults to use for all namepaces + ClusterDefaults ScopedDefaults `json:"clusterDefaults,omitempty"` +} + +// ScopedDefaults are the data residency setting defaults. +type ScopedDefaults struct { + // AllowedPersistenceRegions specifies the regions allowed for data + // storage. Eg "us-east1". An empty configuration means no data residency + // constraints. + AllowedPersistenceRegions []string `json:"messagestoragepolicy.allowedpersistenceregions,omitempty"` +} + +// scoped gets the scoped data residency defaults, for now we only have +// cluster scope. +func (d *Defaults) scoped() *ScopedDefaults { + scopedDefaults := &d.ClusterDefaults + // currently we don't support namespace, but if we do, we should check + // namespace default here. + return scopedDefaults +} + +// AllowedPersistenceRegions gets the AllowedPersistenceRegions setting in the default. +func (d *Defaults) AllowedPersistenceRegions() []string { + return d.scoped().AllowedPersistenceRegions +} + +// ComputeAllowedPersistenceRegions computes the final message storage policy in +// topicConfig. Return true if the topicConfig is updated. +func (d *Defaults) ComputeAllowedPersistenceRegions(topicConfig *pubsub.TopicConfig) bool { + // We can do subset of both in the future, but for now, we just overwrite the + // configuration as the relationship between region and zones are not clear to handle, + // eg. us-east1 vs us-east1-a. Important note: setting the AllowedPersistenceRegions + // to empty string slice is an error, should set it to nil for all regions. + allowedRegions := d.AllowedPersistenceRegions() + if allowedRegions == nil || len(allowedRegions) == 0 { + return false + } + + topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions + return true +} diff --git a/pkg/apis/configs/dataresidency/doc.go b/pkg/apis/configs/dataresidency/doc.go new file mode 100644 index 0000000000..91b9dbf46f --- /dev/null +++ b/pkg/apis/configs/dataresidency/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2020 Google LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package + +// dataresidency holds the typed objects that define the schemas for default +// DataResidency for all components. +package dataresidency diff --git a/pkg/apis/configs/dataresidency/singleton.go b/pkg/apis/configs/dataresidency/singleton.go new file mode 100644 index 0000000000..f4b35f6b97 --- /dev/null +++ b/pkg/apis/configs/dataresidency/singleton.go @@ -0,0 +1,40 @@ +/* +Copyright 2020 Google LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataresidency + +import ( + "context" + "sync" + + "knative.dev/pkg/logging" + + "knative.dev/pkg/configmap" +) + +// +k8s:deepcopy-gen=false +type StoreSingleton struct { + setup sync.Once + store *Store +} + +func (s *StoreSingleton) Store(ctx context.Context, cmw configmap.Watcher) *Store { + s.setup.Do(func() { + s.store = NewStore(logging.FromContext(ctx).Named("config-data-residency-store")) + s.store.WatchConfigs(cmw) + }) + return s.store +} diff --git a/pkg/apis/configs/dataresidency/singleton_test.go b/pkg/apis/configs/dataresidency/singleton_test.go new file mode 100644 index 0000000000..3295228ec1 --- /dev/null +++ b/pkg/apis/configs/dataresidency/singleton_test.go @@ -0,0 +1,44 @@ +/* +Copyright 2020 Google LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataresidency + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + . "knative.dev/pkg/configmap" + . "knative.dev/pkg/configmap/testing" +) + +func TestStoreSingletonLoadWithContext(t *testing.T) { + ctx := context.Background() + + storeSingleton := &StoreSingleton{} + + _, defaultsConfig := ConfigMapsFromTestFile(t, configName, defaulterKey) + cmw := NewStaticWatcher(defaultsConfig) + + store := storeSingleton.Store(ctx, cmw) + + t.Run("defaults", func(t *testing.T) { + expected, _ := NewDefaultsConfigFromConfigMap(defaultsConfig) + if diff := cmp.Diff(expected, store.Load().DataResidencyDefaults); diff != "" { + t.Fatalf("Unexpected defaults config (-want, +got): %v", diff) + } + }) +} diff --git a/pkg/apis/configs/dataresidency/store.go b/pkg/apis/configs/dataresidency/store.go new file mode 100644 index 0000000000..44fec02a42 --- /dev/null +++ b/pkg/apis/configs/dataresidency/store.go @@ -0,0 +1,92 @@ +/* +Copyright 2020 Google LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataresidency + +import ( + "context" + + "knative.dev/pkg/configmap" +) + +type dataresidencyCfgKey struct{} + +// Config holds the collection of configurations that we attach to contexts. +// +k8s:deepcopy-gen=false +type Config struct { + DataResidencyDefaults *Defaults +} + +// FromContext extracts a Config from the provided context. +func FromContext(ctx context.Context) *Config { + x, ok := ctx.Value(dataresidencyCfgKey{}).(*Config) + if ok { + return x + } + return nil +} + +// FromContextOrDefaults is like FromContext, but when no Config is attached it +// returns a Config populated with the defaults for each of the Config fields. +func FromContextOrDefaults(ctx context.Context) *Config { + if cfg := FromContext(ctx); cfg != nil { + return cfg + } + defaults, _ := NewDefaultsConfigFromMap(map[string]string{}) + return &Config{ + DataResidencyDefaults: defaults, + } +} + +// ToContext attaches the provided Config to the provided context, returning the +// new context with the Config attached. +func ToContext(ctx context.Context, c *Config) context.Context { + return context.WithValue(ctx, dataresidencyCfgKey{}, c) +} + +// Store is a typed wrapper around configmap.Untyped store to handle our ConfigMaps. +// +k8s:deepcopy-gen=false +type Store struct { + *configmap.UntypedStore +} + +// NewStore creates a new store of Configs and optionally calls functions when ConfigMaps are updated. +func NewStore(logger configmap.Logger, onAfterStore ...func(name string, value interface{})) *Store { + store := &Store{ + UntypedStore: configmap.NewUntypedStore( + "data-residency-defaults", + logger, + configmap.Constructors{ + ConfigMapName(): NewDefaultsConfigFromConfigMap, + }, + onAfterStore..., + ), + } + + return store +} + +// ToContext attaches the current Config state to the provided context. +func (s *Store) ToContext(ctx context.Context) context.Context { + return ToContext(ctx, s.Load()) +} + +// Load creates a Config from the current config state of the Store. +func (s *Store) Load() *Config { + return &Config{ + DataResidencyDefaults: s.UntypedLoad(ConfigMapName()).(*Defaults).DeepCopy(), + } +} diff --git a/pkg/apis/configs/dataresidency/store_test.go b/pkg/apis/configs/dataresidency/store_test.go new file mode 100644 index 0000000000..48acca1bf6 --- /dev/null +++ b/pkg/apis/configs/dataresidency/store_test.go @@ -0,0 +1,45 @@ +/* +Copyright 2020 Google LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataresidency + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + logtesting "knative.dev/pkg/logging/testing" + + . "knative.dev/pkg/configmap/testing" +) + +func TestStoreLoadWithContext(t *testing.T) { + store := NewStore(logtesting.TestLogger(t)) + + _, defaultsConfig := ConfigMapsFromTestFile(t, configName, defaulterKey) + + store.OnConfigChanged(defaultsConfig) + + config := FromContextOrDefaults(store.ToContext(context.Background())) + + t.Run("defaults", func(t *testing.T) { + expected, _ := NewDefaultsConfigFromConfigMap(defaultsConfig) + if diff := cmp.Diff(expected, config.DataResidencyDefaults); diff != "" { + t.Errorf("Unexpected defaults config (-want, +got): %v", diff) + t.Fatalf("Unexpected defaults config (-want, +got): %v", diff) + } + }) +} diff --git a/pkg/apis/configs/dataresidency/testdata/config-dataresidency.yaml b/pkg/apis/configs/dataresidency/testdata/config-dataresidency.yaml new file mode 100644 index 0000000000..06ae60fd64 --- /dev/null +++ b/pkg/apis/configs/dataresidency/testdata/config-dataresidency.yaml @@ -0,0 +1,54 @@ +# Copyright 2020 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-dataresidency + namespace: cloud-run-events +data: + default-dataresidency-config: | + clusterDefaults: + messagestoragepolicy.allowedpersistenceregions: [] + _example: | + ################################ + # # + # EXAMPLE CONFIGURATION # + # # + ################################ + + # This block is not actually functional configuration, + # but serves to illustrate the available configuration + # options and document them in a way that is accessible + # to users that `kubectl edit` this config map. + # + # These sample configuration options may be copied out of + # this example block and unindented to be in the data block + # to actually change the configuration. + + # default-dataresidency-config is the configuration for determining the default + # data residency to apply to all objects that require data residency but, + # do not specify it. This is expected to be Channels and Sources. + # + # We only support cluster scoped now + default-dataresidency-config: | + # clusterDefaults are the defaults to apply to every namespace in the + # cluster + clusterDefaults: + # messagestoragepolicy.allowedpersistenceregions field specifies + # all the allowed regions for data residency. The default or an empty value will + # mean no data residency requirement. + messagestoragepolicy.allowedpersistenceregions: + - us-east1 + - us-west1 diff --git a/pkg/apis/configs/dataresidency/zz_generated.deepcopy.go b/pkg/apis/configs/dataresidency/zz_generated.deepcopy.go new file mode 100644 index 0000000000..8136ff30dc --- /dev/null +++ b/pkg/apis/configs/dataresidency/zz_generated.deepcopy.go @@ -0,0 +1,59 @@ +// +build !ignore_autogenerated + +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package dataresidency + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Defaults) DeepCopyInto(out *Defaults) { + *out = *in + in.ClusterDefaults.DeepCopyInto(&out.ClusterDefaults) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Defaults. +func (in *Defaults) DeepCopy() *Defaults { + if in == nil { + return nil + } + out := new(Defaults) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScopedDefaults) DeepCopyInto(out *ScopedDefaults) { + *out = *in + if in.AllowedPersistenceRegions != nil { + in, out := &in.AllowedPersistenceRegions, &out.AllowedPersistenceRegions + *out = make([]string, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScopedDefaults. +func (in *ScopedDefaults) DeepCopy() *ScopedDefaults { + if in == nil { + return nil + } + out := new(ScopedDefaults) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index 0fe41a6151..bfd04b1313 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -22,6 +22,8 @@ import ( "context" "fmt" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" + "cloud.google.com/go/pubsub" "github.com/google/knative-gcp/pkg/logging" "go.uber.org/multierr" @@ -56,6 +58,8 @@ type Reconciler struct { // pubsubClient is used as the Pubsub client when present. pubsubClient *pubsub.Client + + dataresidencyStore *dataresidency.Store } // Check that Reconciler implements Interface @@ -145,6 +149,13 @@ func (r *Reconciler) reconcileDecouplingTopicAndSubscription(ctx context.Context // Check if topic exists, and if not, create it. topicID := resources.GenerateDecouplingTopicName(b) topicConfig := &pubsub.TopicConfig{Labels: labels} + if r.dataresidencyStore != nil { + if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil { + if dataresidencyConfig.DataResidencyDefaults.ComputeAllowedPersistenceRegions(topicConfig) { + logging.FromContext(ctx).Debug("Updated Topic Config AllowedPersistenceRegions for Broker", zap.Any("topicConfig", *topicConfig)) + } + } + } topic, err := pubsubReconciler.ReconcileTopic(ctx, topicID, topicConfig, b, &b.Status) if err != nil { return err diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index 29743fe762..09abb535f7 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + "cloud.google.com/go/pubsub" "github.com/google/knative-gcp/pkg/broker/ingress" corev1 "k8s.io/api/core/v1" @@ -38,6 +39,7 @@ import ( . "knative.dev/pkg/reconciler/testing" brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" "github.com/google/knative-gcp/pkg/client/injection/ducks/duck/v1alpha1/resource" brokerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/broker" "github.com/google/knative-gcp/pkg/reconciler" @@ -301,9 +303,58 @@ func TestAllCases(t *testing.T) { "pre": []PubsubAction{}, }, PostConditions: []func(*testing.T, *TableRow){ - TopicExists("cre-bkr_testnamespace_test-broker_abc123"), + TopicExistsWithConfig("cre-bkr_testnamespace_test-broker_abc123", &pubsub.TopicConfig{ + Labels: map[string]string{ + "broker_class": "googlecloud", "name": "test-broker", "namespace": "testnamespace", "resource": "brokers", + }, + }), SubscriptionExists("cre-bkr_testnamespace_test-broker_abc123"), }, + }, { + Name: "Check topic config with correct data residency and label", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(brokerv1beta1.BrokerClass), + WithBrokerUID(testUID), + WithBrokerDeliverySpec(brokerDeliverySpec), + WithBrokerSetDefaults), + NewBrokerCell(resources.DefaultBrokerCellName, systemNS, + WithBrokerCellReady, + WithBrokerCellSetDefaults), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewBroker(brokerName, testNS, + WithBrokerClass(brokerv1beta1.BrokerClass), + WithBrokerUID(testUID), + WithBrokerDeliverySpec(brokerDeliverySpec), + WithBrokerReadyURI(brokerAddress), + WithBrokerSetDefaults, + ), + }}, + WantEvents: []string{ + brokerFinalizerUpdatedEvent, + Eventf(corev1.EventTypeNormal, "TopicCreated", `Created PubSub topic "cre-bkr_testnamespace_test-broker_abc123"`), + Eventf(corev1.EventTypeNormal, "SubscriptionCreated", `Created PubSub subscription "cre-bkr_testnamespace_test-broker_abc123"`), + brokerReconciledEvent, + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(testNS, brokerName, brokerFinalizerName), + }, + OtherTestData: map[string]interface{}{ + "pre": []PubsubAction{}, + "dataResidencyConfigMap": NewDataresidencyConfigMapFromRegions([]string{"us-east1"}), + }, + PostConditions: []func(*testing.T, *TableRow){ + TopicExistsWithConfig("cre-bkr_testnamespace_test-broker_abc123", &pubsub.TopicConfig{ + MessageStoragePolicy: pubsub.MessageStoragePolicy{ + AllowedPersistenceRegions: []string{"us-east1"}, + }, + Labels: map[string]string{ + "broker_class": "googlecloud", "name": "test-broker", "namespace": "testnamespace", "resource": "brokers", + }, + }), + }, }} table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, testData map[string]interface{}) controller.Reconciler { @@ -319,14 +370,20 @@ func TestAllCases(t *testing.T) { } } } + // If we found "dataResidencyConfigMap" in OtherData, we create a store with the configmap + var drStore *dataresidency.Store + if cm, ok := testData["dataResidencyConfigMap"]; ok { + drStore = NewDataresidencyTestStore(t, cm.(*corev1.ConfigMap)) + } ctx = addressable.WithDuck(ctx) ctx = resource.WithDuck(ctx) r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - brokerCellLister: listers.GetBrokerCellLister(), - projectID: testProject, - pubsubClient: psclient, + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + brokerCellLister: listers.GetBrokerCellLister(), + projectID: testProject, + pubsubClient: psclient, + dataresidencyStore: drStore, } return brokerreconciler.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetBrokerLister(), r.Recorder, r, brokerv1beta1.BrokerClass) })) diff --git a/pkg/reconciler/broker/controller.go b/pkg/reconciler/broker/controller.go index cea647fe59..cade095778 100644 --- a/pkg/reconciler/broker/controller.go +++ b/pkg/reconciler/broker/controller.go @@ -20,6 +20,8 @@ import ( "context" "os" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" + "cloud.google.com/go/pubsub" "go.uber.org/zap" "k8s.io/apimachinery/pkg/labels" @@ -72,10 +74,13 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl }() } + dataresidencySingleton := &dataresidency.StoreSingleton{} + r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - brokerCellLister: bcInformer.Lister(), - pubsubClient: client, + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + brokerCellLister: bcInformer.Lister(), + pubsubClient: client, + dataresidencyStore: dataresidencySingleton.Store(ctx, cmw), } impl := brokerreconciler.NewImpl(ctx, r, brokerv1beta1.BrokerClass) diff --git a/pkg/reconciler/broker/controller_test.go b/pkg/reconciler/broker/controller_test.go index 86c7501eac..c9b5dcd6f2 100644 --- a/pkg/reconciler/broker/controller_test.go +++ b/pkg/reconciler/broker/controller_test.go @@ -19,6 +19,8 @@ package broker import ( "testing" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/configmap" @@ -58,6 +60,13 @@ func TestNew(t *testing.T) { }, Data: map[string]string{}, }, + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: dataresidency.ConfigMapName(), + Namespace: system.Namespace(), + }, + Data: map[string]string{}, + }, )) if c == nil { diff --git a/pkg/reconciler/intevents/topic/controller.go b/pkg/reconciler/intevents/topic/controller.go index e79e21999d..9acc7e0daa 100644 --- a/pkg/reconciler/intevents/topic/controller.go +++ b/pkg/reconciler/intevents/topic/controller.go @@ -29,6 +29,7 @@ import ( "knative.dev/pkg/logging" tracingconfig "knative.dev/pkg/tracing/config" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" "github.com/google/knative-gcp/pkg/apis/configs/gcpauth" v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" @@ -60,9 +61,9 @@ type envConfig struct { type Constructor injection.ControllerConstructor // NewConstructor creates a constructor to make a Topic controller. -func NewConstructor(ipm iam.IAMPolicyManager, gcpas *gcpauth.StoreSingleton) Constructor { +func NewConstructor(ipm iam.IAMPolicyManager, gcpas *gcpauth.StoreSingleton, dataresidencyss *dataresidency.StoreSingleton) Constructor { return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { - return newController(ctx, cmw, ipm, gcpas.Store(ctx, cmw)) + return newController(ctx, cmw, ipm, gcpas.Store(ctx, cmw), dataresidencyss.Store(ctx, cmw)) } } @@ -71,6 +72,7 @@ func newController( cmw configmap.Watcher, ipm iam.IAMPolicyManager, gcpas *gcpauth.Store, + dataresidencyStore *dataresidency.Store, ) *controller.Impl { topicInformer := topicinformer.Get(ctx) serviceInformer := serviceinformer.Get(ctx) @@ -88,12 +90,13 @@ func newController( } r := &Reconciler{ - PubSubBase: pubsubBase, - Identity: identity.NewIdentity(ctx, ipm, gcpas), - topicLister: topicInformer.Lister(), - serviceLister: serviceInformer.Lister(), - publisherImage: env.Publisher, - createClientFn: gpubsub.NewClient, + PubSubBase: pubsubBase, + Identity: identity.NewIdentity(ctx, ipm, gcpas), + dataresidencyStore: dataresidencyStore, + topicLister: topicInformer.Lister(), + serviceLister: serviceInformer.Lister(), + publisherImage: env.Publisher, + createClientFn: gpubsub.NewClient, } impl := topicreconciler.NewImpl(ctx, r) diff --git a/pkg/reconciler/intevents/topic/controller_test.go b/pkg/reconciler/intevents/topic/controller_test.go index 51123eefc6..ba9a9f5e3f 100644 --- a/pkg/reconciler/intevents/topic/controller_test.go +++ b/pkg/reconciler/intevents/topic/controller_test.go @@ -20,7 +20,7 @@ import ( "os" "testing" - iamtesting "github.com/google/knative-gcp/pkg/reconciler/testing" + reconcilertesting "github.com/google/knative-gcp/pkg/reconciler/testing" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/system" @@ -50,7 +50,7 @@ func TestNew(t *testing.T) { }, Data: map[string]string{}, }) - c := newController(ctx, cmw, iamtesting.NoopIAMPolicyManager, iamtesting.NewGCPAuthTestStore(t, nil)) + c := newController(ctx, cmw, reconcilertesting.NoopIAMPolicyManager, reconcilertesting.NewGCPAuthTestStore(t, nil), reconcilertesting.NewDataresidencyTestStore(t, nil)) if c == nil { t.Fatal("Expected newControllerWithIAMPolicyManager to return a non-nil value") diff --git a/pkg/reconciler/intevents/topic/topic.go b/pkg/reconciler/intevents/topic/topic.go index 1082149fb1..fe135ff233 100644 --- a/pkg/reconciler/intevents/topic/topic.go +++ b/pkg/reconciler/intevents/topic/topic.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" + "cloud.google.com/go/pubsub" "go.uber.org/zap" "google.golang.org/grpc/codes" corev1 "k8s.io/api/core/v1" @@ -42,6 +43,7 @@ import ( gstatus "google.golang.org/grpc/status" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" topicreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/topic" listers "github.com/google/knative-gcp/pkg/client/listers/intevents/v1" @@ -67,6 +69,8 @@ type Reconciler struct { *intevents.PubSubBase // identity reconciler for reconciling workload identity. *identity.Identity + // data residency store + dataresidencyStore *dataresidency.Store // topicLister index properties about topics. topicLister listers.TopicLister // serviceLister index properties about services. @@ -156,8 +160,16 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, topic *v1.Topic) error logging.FromContext(ctx).Desugar().Error("Topic does not exist and the topic policy doesn't allow creation") return fmt.Errorf("Topic %q does not exist and the topic policy doesn't allow creation", topic.Spec.Topic) } else { + topicConfig := &pubsub.TopicConfig{} + if r.dataresidencyStore != nil { + if dataresidencyCfg := r.dataresidencyStore.Load(); dataresidencyCfg != nil { + if dataresidencyCfg.DataResidencyDefaults.ComputeAllowedPersistenceRegions(topicConfig) { + r.Logger.Debugw("Updated Topic Config AllowedPersistenceRegions for topic reconciler", zap.Any("topicConfig", *topicConfig)) + } + } + } // Create a new topic with the given name. - t, err = client.CreateTopic(ctx, topic.Spec.Topic) + t, err = client.CreateTopicWithConfig(ctx, topic.Spec.Topic, topicConfig) if err != nil { // For some reason (maybe some cache invalidation thing), sometimes t.Exists returns that the topic // doesn't exist but it actually does. When we try to create it again, it fails with an AlreadyExists diff --git a/pkg/reconciler/testing/configmap.go b/pkg/reconciler/testing/configmap.go index 86ceeb54c5..319491b6f4 100644 --- a/pkg/reconciler/testing/configmap.go +++ b/pkg/reconciler/testing/configmap.go @@ -17,8 +17,10 @@ limitations under the License. package testing import ( + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/system" ) // ConfigMapOption enables further configuration of a ConfigMap. @@ -67,3 +69,28 @@ func WithConfigMapBinaryDataEntry(key string, value []byte) ConfigMapOption { cm.BinaryData[key] = value } } + +// NewDataresidencyConfigMapFromRegions Create new data residency configuration map +// from list of allowed persistence regions +func NewDataresidencyConfigMapFromRegions(regions []string) *corev1.ConfigMap { + // Note that the data is in yaml, so no tab is allowed, use spaces instead. + configData := ` + clusterDefaults: + messagestoragepolicy.allowedpersistenceregions:` + if regions == nil || len(regions) == 0 { + configData += " []" + } else { + for _, region := range regions { + configData += "\n - " + region + } + } + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: dataresidency.ConfigMapName(), + Namespace: system.Namespace(), + }, + Data: map[string]string{ + "default-dataresidency-config": configData, + }, + } +} diff --git a/pkg/reconciler/testing/pstest.go b/pkg/reconciler/testing/pstest.go index 2dadd3f943..fea84c53db 100644 --- a/pkg/reconciler/testing/pstest.go +++ b/pkg/reconciler/testing/pstest.go @@ -72,6 +72,23 @@ func TopicExists(id string) func(*testing.T, *rtesting.TableRow) { } } +func TopicExistsWithConfig(id string, expectedTopicConfig *pubsub.TopicConfig) func(*testing.T, *rtesting.TableRow) { + return func(t *testing.T, r *rtesting.TableRow) { + c := getPubsubClient(r) + topic := c.Topic(id) + exist, err := topic.Exists(context.Background()) + if err != nil { + t.Errorf("Error checking topic existence: %v", err) + } else if !exist { + t.Errorf("Expected topic %q to exist", id) + } + topicConfig, err := topic.Config(context.Background()) + if diff := cmp.Diff(*expectedTopicConfig, topicConfig); diff != "" { + t.Errorf("Wrong topic config expected %v, got %v", *expectedTopicConfig, topicConfig) + } + } +} + func OnlyTopics(ids ...string) func(*testing.T, *rtesting.TableRow) { return func(t *testing.T, r *rtesting.TableRow) { c := getPubsubClient(r) diff --git a/pkg/reconciler/testing/store.go b/pkg/reconciler/testing/store.go index 238871b370..eaaa6da724 100644 --- a/pkg/reconciler/testing/store.go +++ b/pkg/reconciler/testing/store.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" logtesting "knative.dev/pkg/logging/testing" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" "github.com/google/knative-gcp/pkg/apis/configs/gcpauth" ) @@ -32,3 +33,11 @@ func NewGCPAuthTestStore(t *testing.T, config *corev1.ConfigMap) *gcpauth.Store } return gcpAuthTestStore } + +func NewDataresidencyTestStore(t *testing.T, config *corev1.ConfigMap) *dataresidency.Store { + dataresidencyTestStore := dataresidency.NewStore(logtesting.TestLogger(t)) + if config != nil { + dataresidencyTestStore.OnConfigChanged(config) + } + return dataresidencyTestStore +} diff --git a/pkg/reconciler/trigger/controller.go b/pkg/reconciler/trigger/controller.go index b4364eb0e7..647e1c6758 100644 --- a/pkg/reconciler/trigger/controller.go +++ b/pkg/reconciler/trigger/controller.go @@ -38,6 +38,7 @@ import ( "knative.dev/pkg/resolver" brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" brokerinformer "github.com/google/knative-gcp/pkg/client/injection/informers/broker/v1beta1/broker" triggerinformer "github.com/google/knative-gcp/pkg/client/injection/informers/broker/v1beta1/trigger" triggerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/trigger" @@ -90,12 +91,13 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl client.Close() }() } - + dataresidencySingleton := &dataresidency.StoreSingleton{} r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - brokerLister: brokerinformer.Get(ctx).Lister(), - pubsubClient: client, - projectID: projectID, + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + brokerLister: brokerinformer.Get(ctx).Lister(), + pubsubClient: client, + projectID: projectID, + dataresidencyStore: dataresidencySingleton.Store(ctx, cmw), } impl := triggerreconciler.NewImpl(ctx, r, withAgentAndFinalizer) diff --git a/pkg/reconciler/trigger/controller_test.go b/pkg/reconciler/trigger/controller_test.go index 10f7cfb80f..adb33a8c62 100644 --- a/pkg/reconciler/trigger/controller_test.go +++ b/pkg/reconciler/trigger/controller_test.go @@ -29,6 +29,7 @@ import ( tracingconfig "knative.dev/pkg/tracing/config" // Fake injection informers + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" _ "github.com/google/knative-gcp/pkg/client/injection/informers/broker/v1beta1/broker/fake" _ "github.com/google/knative-gcp/pkg/client/injection/informers/broker/v1beta1/trigger/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" @@ -65,6 +66,13 @@ func TestNew(t *testing.T) { }, Data: map[string]string{}, }, + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: dataresidency.ConfigMapName(), + Namespace: system.Namespace(), + }, + Data: map[string]string{}, + }, )) if c == nil { diff --git a/pkg/reconciler/trigger/trigger.go b/pkg/reconciler/trigger/trigger.go index 9f59934aa0..1912a9ce0e 100644 --- a/pkg/reconciler/trigger/trigger.go +++ b/pkg/reconciler/trigger/trigger.go @@ -36,6 +36,7 @@ import ( "cloud.google.com/go/pubsub" brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" triggerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/trigger" brokerlisters "github.com/google/knative-gcp/pkg/client/listers/broker/v1beta1" metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata" @@ -81,6 +82,8 @@ type Reconciler struct { // pubsubClient is used as the Pubsub client when present. pubsubClient *pubsub.Client + + dataresidencyStore *dataresidency.Store } // Check that TriggerReconciler implements Interface @@ -223,6 +226,13 @@ func (r *Reconciler) reconcileRetryTopicAndSubscription(ctx context.Context, tri // Check if topic exists, and if not, create it. topicID := resources.GenerateRetryTopicName(trig) topicConfig := &pubsub.TopicConfig{Labels: labels} + if r.dataresidencyStore != nil { + if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil { + if dataresidencyConfig.DataResidencyDefaults.ComputeAllowedPersistenceRegions(topicConfig) { + logging.FromContext(ctx).Debug("Updated Topic Config AllowedPersistenceRegions for Trigger", zap.Any("topicConfig", *topicConfig)) + } + } + } topic, err := pubsubReconciler.ReconcileTopic(ctx, topicID, topicConfig, trig, &trig.Status) if err != nil { return err diff --git a/pkg/reconciler/trigger/trigger_test.go b/pkg/reconciler/trigger/trigger_test.go index 85825eb998..d664e8aba9 100644 --- a/pkg/reconciler/trigger/trigger_test.go +++ b/pkg/reconciler/trigger/trigger_test.go @@ -42,6 +42,7 @@ import ( "knative.dev/pkg/resolver" brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" "github.com/google/knative-gcp/pkg/client/injection/ducks/duck/v1alpha1/resource" triggerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/trigger" "github.com/google/knative-gcp/pkg/reconciler" @@ -377,6 +378,79 @@ func TestAllCasesTrigger(t *testing.T) { MaxDeliveryAttempts: 3, DeadLetterTopic: "projects/test-project-id/topics/test-dead-letter-topic-id", }), + TopicExistsWithConfig("cre-tgr_testnamespace_test-trigger_abc123", &pubsub.TopicConfig{ + Labels: map[string]string{ + "name": "test-trigger", "namespace": "testnamespace", "resource": "triggers", + }, + }), + }, + }, + { + Name: "Check topic config and labels", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(brokerv1beta1.BrokerClass), + WithInitBrokerConditions, + WithBrokerReady("url"), + WithBrokerDeliverySpec(brokerDeliverySpec), + WithBrokerSetDefaults, + ), + makeSubscriberAddressableAsUnstructured(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(testUID), + WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS), + WithTriggerSetDefaults), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(testUID), + WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS), + WithTriggerBrokerReady, + WithTriggerSubscriptionReady, + WithTriggerTopicReady, + WithTriggerDependencyReady, + WithTriggerSubscriberResolvedSucceeded, + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSetDefaults, + ), + }}, + WantEvents: []string{ + triggerFinalizerUpdatedEvent, + topicCreatedEvent, + subscriptionCreatedEvent, + triggerReconciledEvent, + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(testNS, triggerName, finalizerName), + }, + OtherTestData: map[string]interface{}{ + "pre": []PubsubAction{ + Topic("test-dead-letter-topic-id"), + }, + "dataResidencyConfigMap": NewDataresidencyConfigMapFromRegions([]string{"us-east1"}), + }, + PostConditions: []func(*testing.T, *TableRow){ + OnlyTopics("cre-tgr_testnamespace_test-trigger_abc123", "test-dead-letter-topic-id"), + OnlySubscriptions("cre-tgr_testnamespace_test-trigger_abc123"), + SubscriptionHasRetryPolicy("cre-tgr_testnamespace_test-trigger_abc123", + &pubsub.RetryPolicy{ + MaximumBackoff: 5 * time.Second, + MinimumBackoff: 5 * time.Second, + }), + SubscriptionHasDeadLetterPolicy("cre-tgr_testnamespace_test-trigger_abc123", + &pubsub.DeadLetterPolicy{ + MaxDeliveryAttempts: 3, + DeadLetterTopic: "projects/test-project-id/topics/test-dead-letter-topic-id", + }), + TopicExistsWithConfig("cre-tgr_testnamespace_test-trigger_abc123", &pubsub.TopicConfig{ + MessageStoragePolicy: pubsub.MessageStoragePolicy{ + AllowedPersistenceRegions: []string{"us-east1"}, + }, + Labels: map[string]string{ + "name": "test-trigger", "namespace": "testnamespace", "resource": "triggers", + }, + }), }, }, } @@ -385,6 +459,7 @@ func TestAllCasesTrigger(t *testing.T) { // Insert pubsub client for PostConditions and create fixtures psclient, close := TestPubsubClient(ctx, testProject) t.Cleanup(close) + var drStore *dataresidency.Store if testData != nil { InjectPubsubClient(testData, psclient) if testData["pre"] != nil { @@ -393,6 +468,11 @@ func TestAllCasesTrigger(t *testing.T) { f(ctx, t, psclient) } } + + // If we found "dataResidencyConfigMap" in OtherData, we create a store with the configmap + if cm, ok := testData["dataResidencyConfigMap"]; ok { + drStore = NewDataresidencyTestStore(t, cm.(*corev1.ConfigMap)) + } } ctx = addressable.WithDuck(ctx) @@ -407,6 +487,7 @@ func TestAllCasesTrigger(t *testing.T) { uriResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}), projectID: testProject, pubsubClient: psclient, + dataresidencyStore: drStore, } return triggerreconciler.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetTriggerLister(), r.Recorder, r, withAgentAndFinalizer(nil))