Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add federated service watcher #13267

Merged
merged 3 commits into from
Nov 8, 2024
Merged

Add federated service watcher #13267

merged 3 commits into from
Nov 8, 2024

Conversation

adleong
Copy link
Member

@adleong adleong commented Nov 5, 2024

We add support for federated services to the destination controller by adding a new FederatedServiceWatcher. When the destination controller receives a Get request for a Service with the multicluster.linkerd.io/remote-discovery and/or the multicluster.linkerd.io/local-discovery annotations, it subscribes to the FederatedServiceWatcher instead of subscribing to the EndpointsWatcher directly. The FederatedServiceWatcher watches the federated service for any changes to these annotations, and maintains the appropriate watches on the local EndpointWatcher and/or remote EndpointWatchers fetched through the ClusterStore.

This means that we will often have multiple EndpointTranslators writing to the same Get response stream. In order for a NoEndpoints message sent to one EndpointTranslator to not clobber the whole stream, we make a change where NoEndpoints messages are no longer sent to the response stream, but are replaced by a Remove message containing all of the addresses from that EndpointTranslator. This allows multiple EndpointTranslators to coexist on the same stream.

Signed-off-by: Alex Leong <[email protected]>
}()
}

func (s *synchronizedGetStream) Stop() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are two problems with this:

  1. You are modifying a value in one go-routing and reading it in another. This value is not an atomic and is not guarded by a mutex so this is potentially racey, no ?
  2. Simply flipping the bool will not always work. Imagine you are blocked on waiting for update := <-s.ch and you want to stop the stream. You call stop but a value is never received through the update chan. You will hang forever, no ?

My advise is to implement the standard mechanism of supplying a closed chan and selecting over both the updates and the closed chan. This will ensure that things are thread-safe.

for {
 select {
   case <-stopChan:
	 // we are done
	 return
   case update := <-s.ch:
	 // send update
   }
}

Signed-off-by: Alex Leong <[email protected]>
@adleong adleong merged commit c66f83e into main Nov 8, 2024
38 checks passed
@adleong adleong deleted the alex/multidisco branch November 8, 2024 17:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants