Skip to content

tshd-initiated communication for Connect#16043

Closed
ravicious wants to merge 6 commits intomasterfrom
ravicious/tshd-initiated-comm
Closed

tshd-initiated communication for Connect#16043
ravicious wants to merge 6 commits intomasterfrom
ravicious/tshd-initiated-comm

Conversation

@ravicious
Copy link
Copy Markdown
Member

@ravicious ravicious commented Sep 1, 2022

This PR includes a draft of tshd-initiated communication which will be used for various new features in Teleport Connect, such as automatically asking for relogin when attempting to connect to a db proxy after the cert has expired (gravitational/webapps.e#299) and per-session MFA (gravitational/teleport.e#874).

In both scenarios the user sets up a db proxy with Connect and then interacts with the proxy but not through Connect's UI. As such, the app itself has no way of knowing that such an interaction took place.

The PR has a couple of commits but the description mentions the most relevant parts. bfa55ff is just a squash merge of #15398.

webapps counterpart: gravitational/webapps#1176

How it works

We add a server-streaming gRPC ClusterEvents to tshd. On launch, the frontend app starts tshd and then the first call it makes to it is to establish the cluster events stream. tshd created a special channel for writing messages to the stream. The gRPC handler reads from this channel when the stream is active and sends any message from the channel over the stream.

func (s *Handler) ClusterEvents(_ *api.ClusterEventsRequest, stream api.TerminalService_ClusterEventsServer) error {
// TODO: Move the logic from this handler to a separate place.
log := s.DaemonService.Log().WithField(trace.Component, "conn:cevents")
log.Info("Opened the stream.")
for {
select {
case <-stream.Context().Done():
log.Info("The client has disconnected, closing the stream.")
return nil
case clusterEvent, ok := <-s.DaemonService.ClusterEventsC():
if !ok {
log.Info("The ClusterEvents channel has been closed, closing the stream.")
return nil
}
log.Debugf("Sending a message: %v", clusterEvent)
if err := stream.Send(clusterEvent); err != nil {
log.WithError(err).Error("Failed to send a message, closing the stream.")
return err
}
}
}
}

In this draft, the only event sent over the wire is notification about new connection made through the proxy.

// TODO: Add test that makes sure NewWithLocalPort properly copies OnNewConnection.
OnNewConnection: func(gatewayURI uri.ResourceURI, targetURI string) {
// TODO Refactor targetURI from string to uri.ResourceURI.
// https://github.com/gravitational/teleport/issues/15953
clusterURI, err := uri.ParseClusterURI(targetURI)
if err != nil {
s.Log().WithError(err).Warnf(
"Malformed gateway target URI (%s), could not parse it into uri.ResourceURI",
targetURI,
)
return
}
clusterEvent := &api.ClusterEvent{
ClusterUri: clusterURI.String(),
Event: &api.ClusterEvent_NewGatewayConnectionAccepted{
NewGatewayConnectionAccepted: &api.NewGatewayConnectionAccepted{
GatewayUri: gatewayURI.String(),
TargetUri: targetURI,
},
},
}
s.Log().Debug("Sending ClusterEvent to clusterEventsC")
s.clusterEventsC <- clusterEvent
s.Log().Debug("Message sent to clusterEventsC")
},

Additionally, we make sure that only one cluster events stream can be active at a time.

func (s *Handler) ClusterEvents(_ *api.ClusterEventsRequest, stream api.TerminalService_ClusterEventsServer) error {
// TODO: Move the logic from this handler to a separate place.
log := s.DaemonService.Log().WithField(trace.Component, "conn:cevents")
select {
case <-time.After(time.Second):
message := "Couldn't acquire lock within a second, another stream is already active."
log.Info(message)
return trace.AlreadyExists(message)
case <-stream.Context().Done():
log.Info("The client has disconnected while waiting for the lock, closing the stream.")
return nil
case s.DaemonService.ClusterEventsSem <- struct{}{}:
defer func() { <-s.DaemonService.ClusterEventsSem }()
}
log.Info("Opened the stream.")

Channel handling

Making sure there's always a reader

Now, one of the problems with this design is that if the stream is not active, nothing reads from the channel. This means that any place that tries to write to it will be blocked until something reads from the channel again. In worst case, this would prevent a gateway from accepting new connections.

To prevent this, Connect does two things:

  1. Establishing the channel is the first step of the communication between Connect and tshd. The frontend app waits with sending any other request to tshd until the channel is established.
  2. The frontend app is going to automatically attempt to re-establish the channel if it gets closed due to an error. The only exception is the error resulting from attempting to open a stream when one is already active.

Preventing channel deadlocks and panics

daemon.Service is the channel owner. Deadlocking by writing to a nil channel or panicing by closing a nil channel is avoided by daemon.Service instantiating the channel before instantiating anything else that can write to it.

Panicing by writing to a closed channel is avoided by closing anything that writes to the channel before closing the channel itself. For now it just means closing the gateways first.

func (s *Service) Stop() {
s.mu.RLock()
defer s.mu.RUnlock()
s.Log().Info("Stopping")
for _, gateway := range s.gateways {
gateway.Close()
}
close(s.clusterEventsC)
}

daemon.Service is started and stopped only once during the lifetime of tshd so we won't be closing the channel more than once.

The channel handed to places other than daemon.Service is explicitly made read-only on type level.

func (s *Service) ClusterEventsC() <-chan *api.ClusterEvent {
return s.clusterEventsC
}

The implications of long-lived streams

I got asked if I researched that or tried it out in practice. I did some research and I'm yet to try it out in practice, though I forgot to mention that completely in the PR. I'm going to update this section tomorrow.

Also, I haven't even considered just adding a gRPC server to Connect and a client to tshd but this might be worth considering as well.

Updates gravitational/webapps.e#299

Comment on lines +295 to +301
message ClusterEvent {
string cluster_uri = 1;
oneof event {
CertExpired cert_expired = 2;
NewGatewayConnectionAccepted new_gateway_connection_accepted = 3;
}
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a draft version of this message. For now I'm fairly sure that all messages will be cluster-centric. Though rewriting them to support messages that are not tied to any particular cluster shouldn't be hard.

return nil
case clusterEvent, ok := <-s.DaemonService.ClusterEventsC():
if !ok {
log.Info("The ClusterEvents channel has been closed, closing the stream.")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will typically happen only when the app is closing.

Comment on lines +178 to +182
s.Log().Debug("Sending ClusterEvent to clusterEventsC")

s.clusterEventsC <- clusterEvent

s.Log().Debug("Message sent to clusterEventsC")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might as well remove those debugs in the final version. I kept them here to debug some deadlocks that I encountered while writing the first version of this code.

Comment on lines +421 to +422
func (s *Service) ClusterEventsC() <-chan *api.ClusterEvent {
return s.clusterEventsC
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I'm going to move the logic from the gRPC handler somewhere else, the request to create a stream will likely pass through daemon.Service. In that case, we won't even need this method. Instead, we'll just pass the channel to the struct that will handle reading from the channel and writing to the stream.

Comment on lines +30 to +33
case <-time.After(time.Second):
message := "Couldn't acquire lock within a second, another stream is already active."
log.Info(message)
return trace.AlreadyExists(message)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't need to actually wait for a second, I could technically just bail out as soon as I discover that the lock couldn't be acquired.

I was considering using sync.Mutex.TryLock for this but its documentation made me research cases where it's actually useful and it didn't seem like the best fit here.

Would this work though?

select {
  case <- s.DaemonService.ClusterEventsSem <- struct{}{}:
    // …
  default:
    return trace.AlreadyExists("Couldn't acquire lock")
}

Copy link
Copy Markdown
Contributor

@capnspacehook capnspacehook Sep 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would work, I don't see why you'd need to wait for a second and try to send on the channel multiple times.

Would it work to use an atomic integer as a boolean to specify whether a goroutine is active? Or a sync/atomic.Bool if we can use Go 1.19? Would make the logic slightly easier to reason about imo

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would work, I don't see why you'd need to wait for a second and try to send on the channel multiple times.

This was purely because of my inexperience with channels. I forgot that I could use default here and I remembered it just as I was typing my comment. 😅

Though as Tiago said on Slack, sync.Mutex.TryLock would be perfectly good here and as evidenced by the discussion under my post, probably more clear than any other approach.


s.Log().Debug("Sending ClusterEvent to clusterEventsC")

s.clusterEventsC <- clusterEvent
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clusterEventsC is an unbuffered channel. If no stream is listening, handling of the connection can be blocked?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that's the problem I described in the "Making sure there's always a reader" section.

Using a buffer seemed to only postpone dealing with this problem, so I didn't opt for it.

In case of a recoverable stream error, we expect that the frontend app is going to establish a new stream ASAP. If that's not the case then we have a serious problem with the application.

With that in mind, perhaps it'd be a good idea to do this: inside the callback, spin up a new goroutine. The goroutine tries to write on the channel. If that is not possible within three seconds (that's an arbitrary amount of time), it disregards the message, logs the error and then quits.


OTOH I don't see why we'd want the connection to proceed when there's a fundamental problem with the app. As I said above, we expect any intermittent issues with tshd -> Connect connection to be resolved pretty quickly, so if it blocks on that write for a couple hundred of milliseconds then I think that's fine.


Now that I wrote all of this, I understand that in the case of an unrecoverable issue, we might actually want to stop this goroutine from being blocked. I suppose I could expose net.Conn to this function and close the connection if we can't write on the channel within some deadline. Does that sound good?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed that part in the description. I think it's ok to add net.Conn if that solves the situation. Personally, I would try to decouple the dependency of the stream when handling a new connection, and handle the problem of a broken stream elsewhere. Obviously, I am not familiar enough with all the flows so I don't know if there is a better place =p.

@codingllama
Copy link
Copy Markdown
Contributor

Discussed in private with @ravicious, but I'll leave some of my open Qs about the design here as well.

The "vanilla" approach to create a communication channel from tshd->Connect UI would be to make the latter a gRPC service itself. This means one extra socket, a few protos, and that's it. If we are going for a different approach, such as repurposing a streaming RPC as a long-lived connection, we should first carefully compare its pros and cons with the vanilla approach.

As for the streaming approach, I have the following concerns:

  1. What are the implications of a long-running stream? Are streams meant to be used in this way?

  2. The stream approach, over time, is likely to turn the stream into a mini-RPC service, with many disparate messages inside request/response oneofs. This is strictly worse than a standalone server for complexity and maintenance.

  3. Streams are finicky to implement and don't always error in intuitive ways - in particular, most "sends" seem to be fine until the next receive is issued, at least in Go. This is some extra complexity for the implementation to handle.

Bootstrapping "vanilla" takes a bit more work, but it's looking more attractive to me at the moment - plus, it seems to dodge a few implementation woes as well.

@ravicious
Copy link
Copy Markdown
Member Author

@codingllama

1. What are the implications of a long-running stream? Are streams meant to be used in this way?

I found a talk by Eric Anderson, one of the gRPC devs, about long-lived and streaming RPCs. This is the abstract of the talk:

Support for long-lived RPCs and streaming RPCs is a core benefit of using gRPC. While such RPCs have fundamental advantages, they also have inherent complications versus "normal" single-request, single-response RPCs. Learn when it is advantageous to use these more advanced features, potential gotchas, and ways to address them.

Most of the issues Eric talks about are related to use cases where the gRPC server is behind a load balancer and has multiple clients talking to it. For us it'll always be a 1:1 relation.

I also liked Eric's comment on SO which is pretty close to what I heard from you regarding streams vs unary RPCs for the past couple of days under different PRs:

(…) streaming adds complexity when errors occur, and in this case it will cause the RPCs to become long-lived which are more complicated to load balance. So you need to weigh whether the added complexity from streaming/long-lived RPCs provides a large enough benefit to your application.

We don't generally recommend using streaming RPCs for higher gRPC performance. It is true that sending a message on a stream is faster than a new unary RPC, but the improvement is fixed and has higher complexity. Instead, we recommend using streaming RPCs when it would provide higher application (your code) performance or lower application complexity.

In general, I think streams are not an inherently bad solution to this problem. I don't have a full understanding of how the complexity increases when errors occur but I'll come back to it in a second.

I feel like the case of Connect the risks are minimized due to connection being made locally between two processes. One issue would definitely be keeping the stream alive. But as I understand, gRPC has built-in support for that and grpc-js supports keepalive settings.


2. The stream approach, over time, is likely to turn the stream into a mini-RPC service, with many disparate messages inside request/response oneofs. This is strictly worse than a standalone server for complexity and maintenance.

I imagine that the cluster events stream would be used merely for notifications about an event. tshd should never wait for Connect to do something in response to receiving an event. After receiving an event, Connect should invoke a unary RPC or create another stream (in the case of passwordless login for example). We'd never want to turn cluster events into a bidirectional stream.

Some of the potential features we'd like to use it for are:

  • The user attempts to connect to a db proxy after the db cert has expired. Connect should bring focus to its window and ask the user to relogin.
  • The user attempts to connect to a db proxy with per-session MFA turned on. Connect should prompt the user to complete the MFA challenge.
  • An access request gets approved. Connect should show a notification.

In the future we could have another stream to sync the state of cluster resources with the cluster to keep our client-side cache of resources in sync.

With that in mind, would having this mini-RPC service within a stream be that bad complexity-wise? Sure, the response oneof would grow but from the tshd's PoV it'd be only data. When it comes to handling different messages from Connect's perspective, I think the difference between using a stream vs the vanilla approach would be quite small.


3. Streams are finicky to implement and don't always error in intuitive ways - in particular, most "sends" seem to be fine until the next receive is issued, at least in Go. This is some extra complexity for the implementation to handle.

Do you have bidirectional streams in mind here? I'd be curious to hear more about the counterintuitive ways in which streams can fail as I don't have much experience with gRPC.

The way I picture this stream is pretty much how this PR looks like: it reads from a channel, on error the client immediately tries to re-establish the stream (with exponential backoff supported OOTB). If keepalive works as expected then that's another point of failure that's off the table.

I suppose one concern would be making sure that events sent by tshd are actually delivered to Connect which is something that I think might not be guaranteed by gRPC streams, see grpc/grpc-go#2159 for example. Is this what you meant by "most sends seem to be fine…"?


Perhaps I should probably just try out the vanilla approach once I'm back, to have a better picture of how the complexity of it would look like in practice.

From what I've seen (grpc/grpc-go#1345 (comment)) I can have a single client shared by multiple goroutines, so the current problem with no one reading from the channel goes away. I'd just call the client rather than putting a message on the channel.

Setting up the vanilla option on Windows will be somewhat more cumbersome because Node.js doesn't support Unix sockets and named pipes have their own issues too so our only option is TCP (which in turns means managing certs, but we do that already).

I still feel somewhat more favorable towards the stream vs vanilla but if, given what I said here, streams have some inherent issues that make this whole thing much harder to maintain then I'm fine with the vanilla approach. It's hard to weigh the options since I've never used either approach. But this only makes me appreciate your help even more.

@codingllama
Copy link
Copy Markdown
Contributor

@ravicious, a few replies:

I found a talk by Eric Anderson (...)

Thanks, sounds great! Saved for later. :)

Re streams finicky:

Do you have bidirectional streams in mind here?

Yes.

events sent by tshd are actually delivered to Connect which is something that I think might not be guaranteed by gRPC streams (...)

YMMV, but I noticed that client-side sends tend to not error even when the request fails - it's only in the stream.Recv() call that we get the error back. I would do some practical tests, but it may be the case that you'll need ack-style messages to make sure both ends of the stream worked as expected. (That's for Go, no idea what goes on with Js clients.)

Re streams in general:

My initial impulse would be to only go for streams when the circumstances prohibit other approaches (despite recent designs, which I'm beginning to regret for the example they set). For example, if we really have 1-N messages or if the server couldn't dial directly to the client (eg, unreachable network).

In all of my time at the G I don't think I wrote a single streaming RPC. It's anecdotal, but that should give you an idea of how rare it can be.

Re vanilla:

Yep, give it a try. At the very least it should give you better grounds for comparison.

Sharing clients is perfectly fine - the underlying connection pool takes care of it for you. We can have concurrent client-side calls without worry (Connect UI likely has to hold some locks, but that happens either way). It should also eliminate issues like having to keep connections alive all costs, manually redialing if the stream drops, etc.

@ravicious
Copy link
Copy Markdown
Member Author

As I mentioned in my status updates, I did give the vanilla approach a try and we're going to use that approach.

The cons:

  • We run a gRPC server in the renderer process of the Electron app.
    • This introduces a risk of the UI being blocked if the server uses too much resources. We don't expect the server to do particularly heavy work though, seeing as it'll be used for a 1 to 1 connection sending small notifications.
    • There's always an option of moving the server to a separate process if it does need to do heavy work. This requires some additional plumbing which would require additional time investment, probably for little benefit at the moment.
    • This also means we cannot easily enable sandbox for the process that's running the server.
      • However, we're already limited in this area by running a gRPC client in the renderer process that's intended for a Node.js environment (grpc-js). With grpc-web requiring you to run an Envoy proxy, I assume this would blow up the app size and further complicate the set up. So I don't see us dropping gprc-js in favor of enabling sandbox anytime soon.
  • Increased setup complexity
    • On Windows we use gRPC over TCP with mTLS. I've had to refactor our mTLS setup substantially. It assumed that the renderer is going to only create gRPC clients and tsh daemon is only going to host a gRPC server.
    • There's additional orchestration required around starting the server in the renderer, passing the address to tshd, creating a client in tshd and so on.

The pros:

  • After the setup is done, the rest of the code is dead simple, unlike it was with the stream solution. The renderer process no longer needs to reinstantiate the stream in case an error happened and so on.
  • Deserializing simple gRPC request messages is much easier and more idiomatic OOTB. With streams, we'd likely run into nested oneofs. Those require special unwrapping on the JS side.
  • The entire problem of the stream channel being blocked if the stream is down is gone. We simply don't have to worry about it.
    • This was replaced with the need to set up a client. However, as I mentioned we only need to care about it during app startup and nowhere else in the code.

Overall the solution with a separate gRPC server feels much more sturdy. I already submitted PRs that refactor gRPC cert code to accommodate for the changes (#16782, gravitational/webapps#1220). I'm going to push another one in a couple of days which actually sets up the whole server.

Thanks again Alan for the help!

@ravicious ravicious closed this Oct 3, 2022
@ravicious ravicious deleted the ravicious/tshd-initiated-comm branch January 5, 2023 10:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants