Skip to content

Commit 752d1c9

Browse files
authored
Add tests for federated service watcher (#13329)
Adds tests for the federated service watcher that exercise having remote and local clusters join and leave a federated service and ensuring that the correct proxy API updates are emitted. Signed-off-by: Alex Leong <[email protected]>
1 parent 80d28a4 commit 752d1c9

File tree

3 files changed

+424
-4
lines changed

3 files changed

+424
-4
lines changed

controller/api/destination/federated_service_watcher.go

+28-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ type federatedServiceWatcher struct {
2727
localEndpoints *watcher.EndpointsWatcher
2828

2929
log *logging.Entry
30+
31+
sync.RWMutex
3032
}
3133

3234
type remoteDiscoveryID struct {
@@ -109,10 +111,14 @@ func (fsw *federatedServiceWatcher) Subscribe(
109111
endStream chan struct{},
110112
) error {
111113
id := watcher.ServiceID{Namespace: namespace, Name: service}
114+
fsw.RLock()
112115
if federatedService, ok := fsw.services[id]; ok {
116+
fsw.RUnlock()
113117
fsw.log.Debugf("Subscribing to federated service %s/%s", namespace, service)
114118
federatedService.subscribe(port, nodeName, instanceID, stream, endStream)
115119
return nil
120+
} else {
121+
fsw.RUnlock()
116122
}
117123
return fmt.Errorf("service %s/%s is not a federated service", namespace, service)
118124
}
@@ -123,9 +129,13 @@ func (fsw *federatedServiceWatcher) Unsubscribe(
123129
stream pb.Destination_GetServer,
124130
) {
125131
id := watcher.ServiceID{Namespace: namespace, Name: service}
132+
fsw.RLock()
126133
if federatedService, ok := fsw.services[id]; ok {
134+
fsw.RUnlock()
127135
fsw.log.Debugf("Unsubscribing from federated service %s/%s", namespace, service)
128136
federatedService.unsubscribe(stream)
137+
} else {
138+
fsw.RUnlock()
129139
}
130140
}
131141

@@ -137,20 +147,27 @@ func (fsw *federatedServiceWatcher) addService(obj interface{}) {
137147
}
138148

139149
if isFederatedService(service) {
150+
fsw.Lock()
140151
if federatedService, ok := fsw.services[id]; ok {
152+
fsw.Unlock()
141153
fsw.log.Debugf("Updating federated service %s/%s", service.Namespace, service.Name)
142154
federatedService.update(service)
143155
} else {
144156
fsw.log.Debugf("Adding federated service %s/%s", service.Namespace, service.Name)
145157
federatedService = fsw.newFederatedService(service)
146158
fsw.services[id] = federatedService
159+
fsw.Unlock()
147160
federatedService.update(service)
148161
}
149162
} else {
163+
fsw.Lock()
150164
if federatedService, ok := fsw.services[id]; ok {
165+
delete(fsw.services, id)
166+
fsw.Unlock()
151167
fsw.log.Debugf("Service %s/%s is no longer a federated service", service.Namespace, service.Name)
152168
federatedService.delete()
153-
delete(fsw.services, id)
169+
} else {
170+
fsw.Unlock()
154171
}
155172
}
156173
}
@@ -178,10 +195,15 @@ func (fsw *federatedServiceWatcher) deleteService(obj interface{}) {
178195
Namespace: service.Namespace,
179196
Name: service.Name,
180197
}
198+
fsw.Lock()
181199
if federatedService, ok := fsw.services[id]; ok {
182-
federatedService.delete()
183200
delete(fsw.services, id)
201+
fsw.Unlock()
202+
federatedService.delete()
203+
} else {
204+
fsw.Unlock()
184205
}
206+
185207
}
186208

187209
func (fsw *federatedServiceWatcher) newFederatedService(service *corev1.Service) *federatedService {
@@ -268,6 +290,9 @@ func (fs *federatedService) subscribe(
268290
stream pb.Destination_GetServer,
269291
endStream chan struct{},
270292
) {
293+
fs.Lock()
294+
defer fs.Unlock()
295+
271296
syncStream := newSyncronizedGetStream(stream, fs.log)
272297
syncStream.Start()
273298

@@ -287,8 +312,6 @@ func (fs *federatedService) subscribe(
287312
fs.localDiscoverySubscribe(&subscriber, fs.localDiscovery)
288313
}
289314

290-
fs.Lock()
291-
defer fs.Unlock()
292315
fs.subscribers = append(fs.subscribers, subscriber)
293316
}
294317

@@ -322,6 +345,7 @@ func (fs *federatedService) remoteDiscoverySubscribe(
322345
remoteWatcher, remoteConfig, found := fs.clusterStore.Get(id.cluster)
323346
if !found {
324347
fs.log.Errorf("Failed to get remote cluster %s", id.cluster)
348+
return
325349
}
326350

327351
translator := newEndpointTranslator(

0 commit comments

Comments
 (0)