Skip to content

Commit 084e955

Browse files
authored
Add remote-discovery service mirroring (#11201)
We add the ability to mirror services in "remote discovery" mode where no Endpoints are created for the service in the source cluster, but instead the `multicluster.linkerd.io/remote-discovery` and `multicluster.linkerd.io/remote-service` labels are set on the mirror service to indicate that the control plane should perform remote discovery for this service. To accomplish this, we add a new field to the Link resource: `remoteDiscoverySelector` which is a parallel to `selector` but instead it selects Services to export in remote discovery mode. Since this field is purely additive, we do not change the Link CRD version. By treating an empty selector as "Nothing", we remain backwards compatible (an unset `remoteDiscoverySelector` will not export any services in remote discovery mode). Signed-off-by: Alex Leong <[email protected]>
1 parent bc8b4f2 commit 084e955

File tree

10 files changed

+440
-120
lines changed

10 files changed

+440
-120
lines changed

multicluster/charts/linkerd-multicluster/templates/link-crd.yaml

+28
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,34 @@ spec:
7777
type: array
7878
items:
7979
type: string
80+
remoteDiscoverySelector:
81+
description: Selector for Services to mirror in remote discovery mode
82+
type: object
83+
properties:
84+
matchLabels:
85+
type: object
86+
x-kubernetes-preserve-unknown-fields: true
87+
matchExpressions:
88+
description: List of selector requirements
89+
type: array
90+
items:
91+
description: A selector item requires a key and an operator
92+
type: object
93+
required:
94+
- key
95+
- operator
96+
properties:
97+
key:
98+
description: Label key that selector should apply to
99+
type: string
100+
operator:
101+
description: Evaluation of a label in relation to set
102+
type: string
103+
enum: [In, NotIn, Exists, DoesNotExist]
104+
values:
105+
type: array
106+
items:
107+
type: string
80108
targetClusterName:
81109
description: Name of target cluster to link to
82110
type: string

multicluster/cmd/link.go

+9
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type (
4747
controlPlaneVersion string
4848
dockerRegistry string
4949
selector string
50+
remoteDiscoverySelector string
5051
gatewayAddresses string
5152
gatewayPort uint32
5253
ha bool
@@ -250,6 +251,11 @@ A full list of configurable values can be found at https://github.com/linkerd/li
250251
return err
251252
}
252253

254+
remoteDiscoverySelector, err := metav1.ParseToLabelSelector(opts.remoteDiscoverySelector)
255+
if err != nil {
256+
return err
257+
}
258+
253259
link := mc.Link{
254260
Name: opts.clusterName,
255261
Namespace: opts.namespace,
@@ -262,6 +268,7 @@ A full list of configurable values can be found at https://github.com/linkerd/li
262268
GatewayIdentity: gatewayIdentity,
263269
ProbeSpec: probeSpec,
264270
Selector: *selector,
271+
RemoteDiscoverySelector: *remoteDiscoverySelector,
265272
}
266273

267274
obj, err := link.ToUnstructured()
@@ -326,6 +333,7 @@ A full list of configurable values can be found at https://github.com/linkerd/li
326333
cmd.Flags().StringVar(&opts.dockerRegistry, "registry", opts.dockerRegistry,
327334
fmt.Sprintf("Docker registry to pull service mirror controller image from ($%s)", flags.EnvOverrideDockerRegistry))
328335
cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror")
336+
cmd.Flags().StringVar(&opts.remoteDiscoverySelector, "remote-discovery-selector", opts.remoteDiscoverySelector, "Selector (label query) to filter which services in the target cluster to mirror in remote discovery mode")
329337
cmd.Flags().StringVar(&opts.gatewayAddresses, "gateway-addresses", opts.gatewayAddresses, "If specified, overwrites gateway addresses when gateway service is not type LoadBalancer (comma separated list)")
330338
cmd.Flags().Uint32Var(&opts.gatewayPort, "gateway-port", opts.gatewayPort, "If specified, overwrites gateway port when gateway service is not type LoadBalancer")
331339
cmd.Flags().BoolVar(&opts.ha, "ha", opts.ha, "Enable HA configuration for the service-mirror deployment (default false)")
@@ -424,6 +432,7 @@ func newLinkOptionsWithDefault() (*linkOptions, error) {
424432
logLevel: defaults.LogLevel,
425433
logFormat: defaults.LogFormat,
426434
selector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "true"),
435+
remoteDiscoverySelector: fmt.Sprintf("%s=%s", k8s.DefaultExportedServiceSelector, "remote-discovery"),
427436
gatewayAddresses: "",
428437
gatewayPort: 0,
429438
ha: false,

multicluster/cmd/testdata/install_default.golden

+28
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

multicluster/cmd/testdata/install_ha.golden

+28
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

multicluster/cmd/testdata/install_psp.golden

+28
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

multicluster/service-mirror/cluster_watcher.go

+88-27
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@ func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceLabels(remoteService
235235
return labels
236236
}
237237

238+
if rcsw.isRemoteDiscovery(remoteService) {
239+
labels[consts.RemoteDiscoveryLabel] = rcsw.link.TargetClusterName
240+
labels[consts.RemoteServiceLabel] = remoteService.GetName()
241+
}
242+
238243
for key, value := range remoteService.ObjectMeta.Labels {
239244
if strings.HasPrefix(key, consts.SvcMirrorPrefix) {
240245
continue
@@ -430,27 +435,50 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ctx context.
430435
// new gateway being assigned or additional ports exposed. This method takes care of that.
431436
func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context.Context, ev *RemoteServiceUpdated) error {
432437
rcsw.log.Infof("Updating mirror service %s/%s", ev.localService.Namespace, ev.localService.Name)
433-
gatewayAddresses, err := rcsw.resolveGatewayAddress()
434-
if err != nil {
435-
return err
436-
}
437438

438-
copiedEndpoints := ev.localEndpoints.DeepCopy()
439-
copiedEndpoints.Subsets = []corev1.EndpointSubset{
440-
{
441-
Addresses: gatewayAddresses,
442-
Ports: rcsw.getEndpointsPorts(ev.remoteUpdate),
443-
},
444-
}
439+
if rcsw.isRemoteDiscovery(ev.remoteUpdate) {
440+
// The service is mirrored in remote discovery mode and any local
441+
// endpoints for it should be deleted if they exist.
442+
if ev.localEndpoints != nil {
443+
err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.localService.Namespace).Delete(ctx, ev.localService.Name, metav1.DeleteOptions{})
444+
if err != nil {
445+
return RetryableError{[]error{
446+
fmt.Errorf("failed to delete mirror endpoints for %s/%s: %w", ev.localService.Namespace, ev.localService.Name, err),
447+
}}
448+
}
449+
}
450+
} else if ev.localEndpoints == nil {
451+
// The service is mirrored in gateway mode and gateway endpoints should
452+
// be created for it.
453+
err := rcsw.createGatewayEndpoints(ctx, ev.remoteUpdate)
454+
if err != nil {
455+
return err
456+
}
457+
} else {
458+
// The service is mirrored in gateway mode and gateway endpoints already
459+
// exist for it but may need to be updated.
460+
gatewayAddresses, err := rcsw.resolveGatewayAddress()
461+
if err != nil {
462+
return err
463+
}
445464

446-
if copiedEndpoints.Annotations == nil {
447-
copiedEndpoints.Annotations = make(map[string]string)
448-
}
449-
copiedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
465+
copiedEndpoints := ev.localEndpoints.DeepCopy()
466+
copiedEndpoints.Subsets = []corev1.EndpointSubset{
467+
{
468+
Addresses: gatewayAddresses,
469+
Ports: rcsw.getEndpointsPorts(ev.remoteUpdate),
470+
},
471+
}
450472

451-
err = rcsw.updateMirrorEndpoints(ctx, copiedEndpoints)
452-
if err != nil {
453-
return RetryableError{[]error{err}}
473+
if copiedEndpoints.Annotations == nil {
474+
copiedEndpoints.Annotations = make(map[string]string)
475+
}
476+
copiedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
477+
478+
err = rcsw.updateMirrorEndpoints(ctx, copiedEndpoints)
479+
if err != nil {
480+
return RetryableError{[]error{err}}
481+
}
454482
}
455483

456484
ev.localService.Labels = rcsw.getMirroredServiceLabels(ev.remoteUpdate)
@@ -518,6 +546,10 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context.
518546
}
519547
}
520548

549+
if rcsw.isRemoteDiscovery(remoteService) {
550+
// For remote discovery services, skip creating gateway endpoints.
551+
return nil
552+
}
521553
return rcsw.createGatewayEndpoints(ctx, remoteService)
522554
}
523555

@@ -657,15 +689,19 @@ func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.S
657689
lastMirroredRemoteVersion, ok := localService.Annotations[consts.RemoteResourceVersionAnnotation]
658690
if ok && lastMirroredRemoteVersion != service.ResourceVersion {
659691
endpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(service.Namespace).Get(localName)
660-
if err == nil {
661-
rcsw.eventsQueue.Add(&RemoteServiceUpdated{
662-
localService: localService,
663-
localEndpoints: endpoints,
664-
remoteUpdate: service,
665-
})
666-
return nil
692+
if err != nil {
693+
if kerrors.IsNotFound(err) {
694+
endpoints = nil
695+
} else {
696+
return RetryableError{[]error{err}}
697+
}
667698
}
668-
return RetryableError{[]error{err}}
699+
rcsw.eventsQueue.Add(&RemoteServiceUpdated{
700+
localService: localService,
701+
localEndpoints: endpoints,
702+
remoteUpdate: service,
703+
})
704+
return nil
669705
}
670706

671707
return nil
@@ -1174,10 +1210,35 @@ func (rcsw *RemoteClusterServiceWatcher) updateReadiness(endpoints *corev1.Endpo
11741210
}
11751211

11761212
func (rcsw *RemoteClusterServiceWatcher) isExported(l map[string]string) bool {
1213+
// Treat an empty selector as "Nothing" instead of "Everything" so that
1214+
// when the selector field is unset, we don't export all Services.
1215+
if len(rcsw.link.Selector.MatchExpressions)+len(rcsw.link.Selector.MatchLabels) == 0 {
1216+
return false
1217+
}
11771218
selector, err := metav1.LabelSelectorAsSelector(&rcsw.link.Selector)
11781219
if err != nil {
11791220
rcsw.log.Errorf("Invalid selector: %s", err)
11801221
return false
11811222
}
1182-
return selector.Matches(labels.Set(l))
1223+
remoteDiscoverySelector, err := metav1.LabelSelectorAsSelector(&rcsw.link.RemoteDiscoverySelector)
1224+
if err != nil {
1225+
rcsw.log.Errorf("Invalid selector: %s", err)
1226+
return false
1227+
}
1228+
return selector.Matches(labels.Set(l)) || remoteDiscoverySelector.Matches(labels.Set(l))
1229+
}
1230+
1231+
func (rcsw *RemoteClusterServiceWatcher) isRemoteDiscovery(svc *corev1.Service) bool {
1232+
// Treat an empty remoteDisocverySelector as "Nothing" instead of
1233+
// "Everything" so that when the remoteDiscoverySelector field is unset, we
1234+
// don't export all Services.
1235+
if len(rcsw.link.RemoteDiscoverySelector.MatchExpressions)+len(rcsw.link.RemoteDiscoverySelector.MatchLabels) == 0 {
1236+
return false
1237+
}
1238+
remoteDiscoverySelector, err := metav1.LabelSelectorAsSelector(&rcsw.link.RemoteDiscoverySelector)
1239+
if err != nil {
1240+
rcsw.log.Errorf("Invalid selector: %s", err)
1241+
return false
1242+
}
1243+
return remoteDiscoverySelector.Matches(labels.Set(svc.Labels))
11831244
}

0 commit comments

Comments
 (0)