diff --git a/controller/api/destination/watcher/cluster_store.go b/controller/api/destination/watcher/cluster_store.go new file mode 100644 index 0000000000000..945ece39371ce --- /dev/null +++ b/controller/api/destination/watcher/cluster_store.go @@ -0,0 +1,270 @@ +package watcher + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/linkerd/linkerd2/controller/k8s" + logging "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + + consts "github.com/linkerd/linkerd2/pkg/k8s" +) + +type ( + // ClusterStore indexes clusters in which remote service discovery may be + // performed. For each store item, an EndpointsWatcher is created to read + // state directly from the respective cluster's API Server. In addition, + // each store item has some associated and immutable configuration that is + // required for service discovery. + ClusterStore struct { + // Protects against illegal accesses + sync.RWMutex + + k8sAPI *k8s.API + store map[string]remoteCluster + enableEndpointSlices bool + log *logging.Entry + + // Function used to parse a kubeconfig from a byte buffer. Based on the + // kubeconfig, it creates API Server clients + decodeFn configDecoder + } + + // remoteCluster is a helper struct that represents a store item + remoteCluster struct { + watcher *EndpointsWatcher + config clusterConfig + + // Used to signal shutdown to the associated watcher's informers + stopCh chan<- struct{} + } + + // clusterConfig holds immutable configuration for a given cluster + clusterConfig struct { + TrustDomain string + ClusterDomain string + } + + // configDecoder is the type of a function that given a byte buffer, returns + // a pair of API Server clients. The cache uses this function to dynamically + // create clients after discovering a Secret. + configDecoder = func(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) +) + +const ( + clusterNameLabel = "multicluster.linkerd.io/cluster-name" + trustDomainAnnotation = "multicluster.linkerd.io/trust-domain" + clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain" +) + +// NewClusterStore creates a new (empty) ClusterStore. It +// requires a Kubernetes API Server client instantiated for the local cluster. +// +// When created, a pair of event handlers are registered for the local cluster's +// Secret informer. The event handlers are responsible for driving the discovery +// of remote clusters and their configuration +func NewClusterStore(k8sAPI *k8s.API, enableEndpointSlices bool) (*ClusterStore, error) { + return newClusterStoreWithDecoder(k8sAPI, enableEndpointSlices, decodeK8sConfigFromSecret) +} + +// newClusterStoreWithDecoder is a helper function that allows the creation of a +// store with an arbitrary `configDecoder` function. +func newClusterStoreWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, decodeFn configDecoder) (*ClusterStore, error) { + cs := &ClusterStore{ + store: make(map[string]remoteCluster), + log: logging.WithFields(logging.Fields{ + "component": "cluster-store", + }), + enableEndpointSlices: enableEndpointSlices, + k8sAPI: k8sAPI, + decodeFn: decodeFn, + } + + _, err := cs.k8sAPI.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + secret, ok := obj.(*v1.Secret) + if !ok { + cs.log.Errorf("Error processing 'Secret' object: got %#v, expected *corev1.Secret", secret) + return + } + + if secret.Type != consts.MirrorSecretType { + cs.log.Tracef("Skipping Add event for 'Secret' object %s/%s: invalid type %s", secret.Namespace, secret.Name, secret.Type) + return + + } + + clusterName, found := secret.GetLabels()[clusterNameLabel] + if !found { + cs.log.Tracef("Skipping Add event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel) + return + } + + if err := cs.addCluster(clusterName, secret); err != nil { + cs.log.Errorf("Error adding cluster %s to store: %v", clusterName, err) + } + + }, + DeleteFunc: func(obj interface{}) { + secret, ok := obj.(*v1.Secret) + if !ok { + // If the Secret was deleted when the watch was disconnected + // (for whatever reason) and the event was missed, the object is + // added with 'DeletedFinalStateUnknown'. Its state may be + // stale, so it should be cleaned-up. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + cs.log.Debugf("Unable to get object from DeletedFinalStateUnknown %#v", obj) + return + } + // If the zombie object is a `Secret` we can delete it. + secret, ok = tombstone.Obj.(*v1.Secret) + if !ok { + cs.log.Debugf("DeletedFinalStateUnknown contained object that is not a Secret %#v", obj) + return + } + } + + clusterName, found := secret.GetLabels()[clusterNameLabel] + if !found { + cs.log.Tracef("Skipping Delete event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel) + return + } + + cs.removeCluster(clusterName) + + }, + }) + + if err != nil { + return nil, err + } + + return cs, nil +} + +// Get safely retrieves a store item given a cluster name. +func (cs *ClusterStore) Get(clusterName string) (*EndpointsWatcher, clusterConfig, bool) { + cs.RLock() + defer cs.RUnlock() + cw, found := cs.store[clusterName] + return cw.watcher, cw.config, found +} + +// removeCluster is triggered by the cache's Secret informer when a secret is +// removed. Given a cluster name, it removes the entry from the cache after +// stopping the associated watcher. +func (cs *ClusterStore) removeCluster(clusterName string) { + cs.Lock() + defer cs.Unlock() + r, found := cs.store[clusterName] + if !found { + return + } + r.watcher.removeHandlers() + close(r.stopCh) + delete(cs.store, clusterName) + cs.log.Tracef("Removed cluster %s from ClusterStore", clusterName) +} + +// addCluster is triggered by the cache's Secret informer when a secret is +// discovered for the first time. Given a cluster name and a Secret +// object, it creates an EndpointsWatcher for a remote cluster and syncs its +// informers before returning. +func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error { + data, found := secret.Data[consts.ConfigKeyName] + if !found { + return errors.New("missing kubeconfig file") + } + + clusterDomain, found := secret.GetAnnotations()[clusterDomainAnnotation] + if !found { + return fmt.Errorf("missing \"%s\" annotation", clusterDomainAnnotation) + } + + trustDomain, found := secret.GetAnnotations()[trustDomainAnnotation] + if !found { + return fmt.Errorf("missing \"%s\" annotation", trustDomainAnnotation) + } + + remoteAPI, metadataAPI, err := cs.decodeFn(data, cs.enableEndpointSlices) + if err != nil { + return err + } + + stopCh := make(chan struct{}, 1) + watcher, err := NewEndpointsWatcher( + remoteAPI, + metadataAPI, + logging.WithFields(logging.Fields{ + "remote-cluster": clusterName, + }), + cs.enableEndpointSlices, + ) + if err != nil { + return err + } + + cs.Lock() + defer cs.Unlock() + cs.store[clusterName] = remoteCluster{ + watcher, + clusterConfig{ + trustDomain, + clusterDomain, + }, + stopCh, + } + + go func() { + remoteAPI.Sync(stopCh) + metadataAPI.Sync(stopCh) + }() + + cs.log.Tracef("Added cluster %s to ClusterStore", clusterName) + + return nil +} + +// decodeK8sConfigFromSecret implements the decoder function type. Given a byte +// buffer, it attempts to parse it as a kubeconfig file. If successful, returns +// a pair of API Server clients. +func decodeK8sConfigFromSecret(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { + cfg, err := clientcmd.RESTConfigFromKubeConfig(data) + if err != nil { + return nil, nil, err + } + + ctx := context.Background() + var remoteAPI *k8s.API + if enableEndpointSlices { + remoteAPI, err = k8s.InitializeAPIForConfig( + ctx, + cfg, + true, + k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, + ) + } else { + remoteAPI, err = k8s.InitializeAPIForConfig( + ctx, + cfg, + true, + k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, + ) + } + if err != nil { + return nil, nil, err + } + + metadataAPI, err := k8s.InitializeMetadataAPIForConfig(cfg, k8s.Node, k8s.RS) + if err != nil { + return nil, nil, err + } + + return remoteAPI, metadataAPI, nil +} diff --git a/controller/api/destination/watcher/cluster_store_test.go b/controller/api/destination/watcher/cluster_store_test.go new file mode 100644 index 0000000000000..1f11fc7831a64 --- /dev/null +++ b/controller/api/destination/watcher/cluster_store_test.go @@ -0,0 +1,262 @@ +package watcher + +import ( + "testing" + "time" + + "github.com/linkerd/linkerd2/controller/k8s" +) + +func CreateMockDecoder() configDecoder { + // Create a mock decoder with some random objs to satisfy client creation + return func(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { + remoteAPI, err := k8s.NewFakeAPI([]string{}...) + if err != nil { + return nil, nil, err + } + + metadataAPI, err := k8s.NewFakeMetadataAPI(nil) + if err != nil { + return nil, nil, err + } + + return remoteAPI, metadataAPI, nil + } + +} + +func TestClusterStoreHandlers(t *testing.T) { + for _, tt := range []struct { + name string + k8sConfigs []string + enableEndpointSlices bool + expectedClusters map[string]clusterConfig + deleteClusters map[string]struct{} + }{ + { + name: "add and remove remote watcher when Secret is valid", + k8sConfigs: []string{ + validRemoteSecret, + }, + enableEndpointSlices: true, + expectedClusters: map[string]clusterConfig{ + "remote": { + TrustDomain: "identity.org", + ClusterDomain: "cluster.local", + }, + }, + deleteClusters: map[string]struct{}{ + "remote": {}, + }, + }, + { + name: "add and remove more than one watcher when Secrets are valid", + k8sConfigs: []string{ + validRemoteSecret, + validTargetSecret, + }, + enableEndpointSlices: false, + expectedClusters: map[string]clusterConfig{ + "remote": { + TrustDomain: "identity.org", + ClusterDomain: "cluster.local", + }, + "target": { + TrustDomain: "cluster.target.local", + ClusterDomain: "cluster.target.local", + }, + }, + deleteClusters: map[string]struct{}{ + "remote": {}, + }, + }, + { + name: "malformed secrets shouldn't result in created watchers", + k8sConfigs: []string{ + validRemoteSecret, + noClusterSecret, + noDomainSecret, + noIdentitySecret, + invalidTypeSecret, + }, + enableEndpointSlices: true, + expectedClusters: map[string]clusterConfig{ + "remote": { + TrustDomain: "identity.org", + ClusterDomain: "cluster.local", + }, + }, + deleteClusters: map[string]struct{}{ + "remote": {}, + }, + }, + } { + tt := tt // Pin + t.Run(tt.name, func(t *testing.T) { + // TODO (matei): use namespace scoped API here + k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...) + if err != nil { + t.Fatalf("NewFakeAPI returned an error: %s", err) + } + + metadataAPI, err := k8s.NewFakeMetadataAPI(nil) + if err != nil { + t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) + } + + cs, err := newClusterStoreWithDecoder(k8sAPI, tt.enableEndpointSlices, CreateMockDecoder()) + if err != nil { + t.Fatalf("Unexpected error when starting watcher cache: %s", err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + // Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on + time.Sleep(50 * time.Millisecond) + + cs.RLock() + actualLen := len(cs.store) + cs.RUnlock() + + if actualLen != len(tt.expectedClusters) { + t.Fatalf("Unexpected error: expected to see %d cache entries, got: %d", len(tt.expectedClusters), actualLen) + } + + for k, expected := range tt.expectedClusters { + _, cfg, found := cs.Get(k) + if !found { + t.Fatalf("Unexpected error: cluster %s is missing from the cache", k) + } + + if cfg.ClusterDomain != expected.ClusterDomain { + t.Fatalf("Unexpected error: expected cluster domain %s for cluster '%s', got: %s", expected.ClusterDomain, k, cfg.ClusterDomain) + } + + if cfg.TrustDomain != expected.TrustDomain { + t.Fatalf("Unexpected error: expected cluster domain %s for cluster '%s', got: %s", expected.TrustDomain, k, cfg.TrustDomain) + } + } + + // Handle delete events + if len(tt.deleteClusters) != 0 { + for k := range tt.deleteClusters { + watcher, _, found := cs.Get(k) + if !found { + t.Fatalf("Unexpected error: watcher %s should exist in the cache", k) + } + // Unfortunately, mock k8s client does not propagate + // deletes, so we have to call remove directly. + cs.removeCluster(k) + // Leave it to do its thing and gracefully shutdown + time.Sleep(50 * time.Millisecond) + var hasStopped bool + if tt.enableEndpointSlices { + hasStopped = watcher.k8sAPI.ES().Informer().IsStopped() + } else { + hasStopped = watcher.k8sAPI.Endpoint().Informer().IsStopped() + } + if !hasStopped { + t.Fatalf("Unexpected error: informers for watcher %s should be stopped", k) + } + + if _, _, found := cs.Get(k); found { + t.Fatalf("Unexpected error: watcher %s should have been removed from the cache", k) + } + + } + } + }) + } +} + +var validRemoteSecret = ` +apiVersion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig +metadata: + namespace: linkerd + name: remote-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: remote + annotations: + multicluster.linkerd.io/trust-domain: identity.org + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK +` + +var validTargetSecret = ` +apiversion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig +metadata: + namespace: linkerd + name: target-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: target + annotations: + multicluster.linkerd.io/trust-domain: cluster.target.local + multicluster.linkerd.io/cluster-domain: cluster.target.local +data: + kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk +` + +var noDomainSecret = ` +apiVersion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig +metadata: + namespace: linkerd + name: target-1-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: target-1 + annotations: + multicluster.linkerd.io/trust-domain: cluster.local +data: + kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK +` + +var noClusterSecret = ` +apiVersion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig +metadata: + namespace: linkerd + name: target-2-cluster-credentials + annotations: + multicluster.linkerd.io/trust-domain: cluster.local + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK +` + +var noIdentitySecret = ` +apiversion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig +metadata: + namespace: linkerd + name: target-3-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: target-3 + annotations: + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk +` +var invalidTypeSecret = ` +apiversion: v1 +kind: Secret +type: kubernetes.io/tls +metadata: + namespace: linkerd + name: target-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: target + annotations: + multicluster.linkerd.io/trust-domain: cluster.local + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk +` diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index ca3809cae8dbf..05bca4c4e0932 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -78,6 +78,18 @@ type ( log *logging.Entry enableEndpointSlices bool sync.RWMutex // This mutex protects modification of the map itself. + + informerHandlers + } + + // informerHandlers holds a registration handle for each informer handler + // that has been registered for the EndpointsWatcher. The registration + // handles are used to re-deregister informer handlers when the + // EndpointsWatcher stops. + informerHandlers struct { + epHandle cache.ResourceEventHandlerRegistration + svcHandle cache.ResourceEventHandlerRegistration + srvHandle cache.ResourceEventHandlerRegistration } // servicePublisher represents a service. It keeps a map of portPublishers @@ -149,7 +161,8 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log }), } - _, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + var err error + ew.svcHandle, err = k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addService, DeleteFunc: ew.deleteService, UpdateFunc: func(_, obj interface{}) { ew.addService(obj) }, @@ -158,7 +171,7 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log return nil, err } - _, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + ew.srvHandle, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addServer, DeleteFunc: ew.deleteServer, UpdateFunc: func(_, obj interface{}) { ew.addServer(obj) }, @@ -169,7 +182,7 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log if ew.enableEndpointSlices { ew.log.Debugf("Watching EndpointSlice resources") - _, err := k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + ew.epHandle, err = k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addEndpointSlice, DeleteFunc: ew.deleteEndpointSlice, UpdateFunc: ew.updateEndpointSlice, @@ -177,9 +190,10 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log if err != nil { return nil, err } + } else { ew.log.Debugf("Watching Endpoints resources") - _, err = k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + ew.epHandle, err = k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addEndpoints, DeleteFunc: ew.deleteEndpoints, UpdateFunc: func(_, obj interface{}) { ew.addEndpoints(obj) }, @@ -232,6 +246,37 @@ func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string sp.unsubscribe(port, hostname, listener) } +// removeHanders will de-register any event handlers used by the +// EndpointsWatcher's informers. +func (ew *EndpointsWatcher) removeHandlers() { + ew.Lock() + defer ew.Unlock() + if ew.svcHandle != nil { + if err := ew.k8sAPI.Svc().Informer().RemoveEventHandler(ew.svcHandle); err != nil { + ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + } + } + + if ew.srvHandle != nil { + if err := ew.k8sAPI.Srv().Informer().RemoveEventHandler(ew.srvHandle); err != nil { + ew.log.Errorf("Failed to remove Server informer event handlers: %s", err) + } + } + + if ew.epHandle != nil { + if ew.enableEndpointSlices { + if err := ew.k8sAPI.ES().Informer().RemoveEventHandler(ew.epHandle); err != nil { + + ew.log.Errorf("Failed to remove EndpointSlice informer event handlers: %s", err) + } + } else { + if err := ew.k8sAPI.Endpoint().Informer().RemoveEventHandler(ew.epHandle); err != nil { + ew.log.Errorf("Failed to remove Endpoints informer event handlers: %s", err) + } + } + } +} + func (ew *EndpointsWatcher) addService(obj interface{}) { service := obj.(*corev1.Service) id := ServiceID{ diff --git a/controller/k8s/metadata_api.go b/controller/k8s/metadata_api.go index 600fd4715a396..ce75412fcc235 100644 --- a/controller/k8s/metadata_api.go +++ b/controller/k8s/metadata_api.go @@ -18,6 +18,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/metadata" "k8s.io/client-go/metadata/metadatainformer" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" ) @@ -57,6 +58,26 @@ func InitializeMetadataAPI(kubeConfig string, resources ...APIResource) (*Metada return api, nil } +func InitializeMetadataAPIForConfig(kubeConfig *rest.Config, resources ...APIResource) (*MetadataAPI, error) { + client, err := metadata.NewForConfig(kubeConfig) + if err != nil { + return nil, err + } + + api, err := newClusterScopedMetadataAPI(client, resources...) + if err != nil { + return nil, err + } + + for _, gauge := range api.gauges { + if err := prometheus.Register(gauge); err != nil { + log.Warnf("failed to register Prometheus gauge %s: %s", gauge.Desc().String(), err) + } + } + return api, nil + +} + func newClusterScopedMetadataAPI( metadataClient metadata.Interface, resources ...APIResource, diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index 559722dd96098..1276d62124f2d 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -37,6 +37,7 @@ func NewFakeAPI(configs ...string) (*API, error) { Node, ES, Srv, + Secret, ), nil }