Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 30 additions & 31 deletions lib/cache/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,8 @@ func setupCollections(c *Cache, watches []types.WatchKind) (*cacheCollections, e
}
collections.byKind[resourceKind] = collections.integrations
case types.KindHeadlessAuthentication:
collections.byKind[resourceKind] = &genericCollection[*types.HeadlessAuthentication, noReader, headlessAuthenticationServiceExecutor]{cache: c, watch: watch}
// For headless authentications, we need only process events. We don't need to keep the cache up to date.
collections.byKind[resourceKind] = &genericCollection[*types.HeadlessAuthentication, noReader, noopExecutor]{cache: c, watch: watch}
case types.KindAccessList:
if c.AccessLists == nil {
return nil, trace.BadParameter("missing parameter AccessLists")
Expand Down Expand Up @@ -2309,36 +2310,6 @@ func (integrationsExecutor) getReader(cache *Cache, cacheOK bool) services.Integ

var _ executor[types.Integration, services.IntegrationsGetter] = integrationsExecutor{}

type headlessAuthenticationServiceExecutor struct{}

func (headlessAuthenticationServiceExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]*types.HeadlessAuthentication, error) {
return cache.headlessAuthenticationsCache.GetHeadlessAuthentications(ctx)
}

func (headlessAuthenticationServiceExecutor) upsert(ctx context.Context, cache *Cache, resource *types.HeadlessAuthentication) error {
return cache.headlessAuthenticationsCache.UpsertHeadlessAuthentication(ctx, resource)
}

func (headlessAuthenticationServiceExecutor) deleteAll(ctx context.Context, cache *Cache) error {
return cache.headlessAuthenticationsCache.DeleteAllHeadlessAuthentications(ctx)
}

func (headlessAuthenticationServiceExecutor) delete(ctx context.Context, cache *Cache, resource types.Resource) error {
ha, ok := resource.(*types.HeadlessAuthentication)
if !ok {
return trace.BadParameter("unexpected type %T", resource)
}
return cache.headlessAuthenticationsCache.DeleteHeadlessAuthentication(ctx, ha.User, resource.GetName())
}

func (headlessAuthenticationServiceExecutor) isSingleton() bool { return false }

func (headlessAuthenticationServiceExecutor) getReader(_ *Cache, _ bool) noReader {
return noReader{}
}

var _ executor[*types.HeadlessAuthentication, noReader] = headlessAuthenticationServiceExecutor{}

type accessListsExecutor struct{}

func (accessListsExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]*accesslist.AccessList, error) {
Expand Down Expand Up @@ -2369,3 +2340,31 @@ func (accessListsExecutor) getReader(cache *Cache, cacheOK bool) services.Access
}

var _ executor[*accesslist.AccessList, services.AccessListsGetter] = accessListsExecutor{}

// noopExecutor can be used when a resource's events do not need to processed by
// the cache itself, only passed on to other watchers.
type noopExecutor struct{}

func (noopExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]*types.HeadlessAuthentication, error) {
return nil, nil
}

func (noopExecutor) upsert(ctx context.Context, cache *Cache, resource *types.HeadlessAuthentication) error {
return nil
}

func (noopExecutor) deleteAll(ctx context.Context, cache *Cache) error {
return nil
}

func (noopExecutor) delete(ctx context.Context, cache *Cache, resource types.Resource) error {
return nil
}

func (noopExecutor) isSingleton() bool { return false }

func (noopExecutor) getReader(_ *Cache, _ bool) noReader {
return noReader{}
}

var _ executor[*types.HeadlessAuthentication, noReader] = noopExecutor{}
2 changes: 1 addition & 1 deletion lib/services/local/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,7 @@ type headlessAuthenticationParser struct {
func (p *headlessAuthenticationParser) parse(event backend.Event) (types.Resource, error) {
switch event.Type {
case types.OpDelete:
return resourceHeader(event, types.KindIntegration, types.V1, 0)
return resourceHeader(event, types.KindHeadlessAuthentication, types.V1, 0)
case types.OpPut:
ha, err := unmarshalHeadlessAuthentication(event.Item.Value)
if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions lib/teleterm/clusters/cluster_headless.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,44 @@ func (c *Cluster) WatchPendingHeadlessAuthentications(ctx context.Context) (watc
return watcher, close, trace.Wrap(err)
}

// WatchHeadlessAuthentications watches the backend for headless authentication events for the user.
func (c *Cluster) WatchHeadlessAuthentications(ctx context.Context) (watcher types.Watcher, close func(), err error) {
proxyClient, err := c.clusterClient.ConnectToProxy(ctx)
if err != nil {
return nil, nil, trace.Wrap(err)
}

rootClient, err := proxyClient.ConnectToRootCluster(ctx)
if err != nil {
proxyClient.Close()
return nil, nil, trace.Wrap(err)
}

watch := types.Watch{
Kinds: []types.WatchKind{{
Kind: types.KindHeadlessAuthentication,
Filter: (&types.HeadlessAuthenticationFilter{
Username: c.clusterClient.Username,
}).IntoMap(),
}},
}

watcher, err = rootClient.NewWatcher(ctx, watch)
if err != nil {
proxyClient.Close()
rootClient.Close()
return nil, nil, trace.Wrap(err)
}

close = func() {
watcher.Close()
proxyClient.Close()
rootClient.Close()
}

return watcher, close, trace.Wrap(err)
}

// UpdateHeadlessAuthenticationState updates the headless authentication matching the given id to the given state.
// MFA will be prompted when updating to the approve state.
func (c *Cluster) UpdateHeadlessAuthenticationState(ctx context.Context, headlessID string, state types.HeadlessAuthenticationState) error {
Expand Down
70 changes: 62 additions & 8 deletions lib/teleterm/daemon/daemon.go
Comment thread
Joerger marked this conversation as resolved.
Outdated
Comment thread
Joerger marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I execute a headless login and then approve it outside of Connect (or in another instance of Connect), Connect will continue to show the modal. I guess that's because the loop in tshd will be stuck on s.tshdEventsClient.SendPendingHeadlessAuthentication.

How hard would it be to abort that request when we detect that the auth prompt has been approved? I think it wouldn't need to be addressed in this PR, but it feels like something we should implement sooner rather than later.

In this PR, we should at least add a timeout to the request, I feel like a minute would be enough?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How hard would it be to abort that request when we detect that the auth prompt has been approved?

The more I think about it, the more doable it seems. We'd basically need to set up a new watcher using rpc WatchEvents to watch specifically for approve/deny events matching the headless ID. I'll give it an attempt.

I also think having a timeout (3m callback timeout to match the max lifetime of a headless request) is a must have, thanks for suggesting it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fb8b2efa8a4cb623b896f26feae3cc52dc9cecda

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the relogin logic in gateways and the headless watcher show an important modal in Connect and focus the app. An important modal is going to cover a regular modal shown in the app.

The Electron app doesn't guard against attempting to show two important modals at the same time. Calling modalsService.openImportantDialog twice will simply remove the first modal and replace it with the new one.

Because the relogin logic in gateways and the headless watcher don't coordinate with each other, it's possible for them to overwrite each other. I don't think that should be the case: in a situation where you requested a headless login somewhere and the app was brought to focus, if a DB GUI client attempts to relogin, it should not overwrite the headless authn modal.

The relogin logic is already written in a way that assumes it might not be able to lock a mutex and will instead fail immediately. I think we could do the same for headless authn. It should boil down to renaming reloginMu to importantModalMu, calling TryLock on it before calling tshdEventsClient.SendPendingHeadlessAuthentication and then logging and ignoring a failed attempt at obtaining a lock.

What do you think about this? Both relogin and headless authn occuring at the same time are rather unlikely, but it would set out a good pattern to follow for any other features using important modals in tshd.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, after reading Grzegorz's comment about two requests in a row, I realized that it's possible for two headless requests from two different clusters to override each other.

Copy link
Copy Markdown
Contributor Author

@Joerger Joerger Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should add importantModalMu to fix the issue, but I don't actually think we should replace reloginMu.

IIUC, there is a second reason for reloginMu - to trigger relogin just once even if multiple gateways in the same cluster trigger relogin at the same time. In this case, TryLock makes sense, and we should keep reloginMu for the TryLock flow.

In general, we want to use a normal importantModalMu.Lock so that multiple important modals can be displayed back to back, without any being dropped.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eff189353ea587d662a89f81d2e0c7436b759061

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I'm just thinking about the name: tshdEventsClientMu makes it seem like it should guard every call to tshdEventsClient. I feel like importantModalMu would be more clear in this case, but I see you dropped that name in favor of tshdEventsClient.

Copy link
Copy Markdown
Contributor Author

@Joerger Joerger Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named it tshdEventsClientMu to include the tshdEventsClient.SendNotification, but if you think this is only needed for important modals, i will remove it from SendNotification and rename it.

0fa2e3d964ed966fc3569c171a9fb2ea9c777c2d

Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,20 @@ import (
usagereporter "github.com/gravitational/teleport/lib/usagereporter/daemon"
)

// tshdEventsTimeout is the maximum amount of time the gRPC client managed by the tshd daemon will
// wait for a response from the tshd events server managed by the Electron app. This timeout
// should be used for quick one-off calls where the client doesn't need the server or the user to
// perform any additional work, such as the SendNotification RPC.
const tshdEventsTimeout = time.Second
const (
// tshdEventsTimeout is the maximum amount of time the gRPC client managed by the tshd daemon will
// wait for a response from the tshd events server managed by the Electron app. This timeout
// should be used for quick one-off calls where the client doesn't need the server or the user to
// perform any additional work, such as the SendNotification RPC.
tshdEventsTimeout = time.Second

// imporantModalWaitDuraiton is the amount of time to wait between sending tshd events that
// display important modals in the Electron App. This ensures a clear transition between modals.
imporantModalWaitDuraiton = time.Second / 2

// The Electron App can only display one important modal at a time.
maxConcurrentImportantModals = 1
)

// New creates an instance of Daemon service
func New(cfg Config) (*Service, error) {
Expand Down Expand Up @@ -68,13 +77,18 @@ func New(cfg Config) (*Service, error) {

// relogin makes the Electron app display a login modal to trigger re-login.
func (s *Service) relogin(ctx context.Context, req *api.ReloginRequest) error {
// The Electron app cannot display two login modals at the same time, so we have to cut short any
// concurrent relogin requests.
// Relogin may be triggered by multiple gateways simultaneously. To prevent
// redundant relogin requests, cut short additional relogin requests.
if !s.reloginMu.TryLock() {
return trace.AlreadyExists("another relogin request is in progress")
}
defer s.reloginMu.Unlock()

if err := s.importantModalSemaphore.Acquire(ctx); err != nil {
return trace.Wrap(err)
}
defer s.importantModalSemaphore.Release()

const reloginUserTimeout = time.Minute
timeoutCtx, cancelTshdEventsCtx := context.WithTimeout(ctx, reloginUserTimeout)
defer cancelTshdEventsCtx()
Expand Down Expand Up @@ -223,7 +237,7 @@ func (s *Service) ClusterLogout(ctx context.Context, uri string) error {
return trace.Wrap(err)
}

if err := s.StopHeadlessWatcher(uri); err != nil {
if err := s.StopHeadlessWatcher(uri); err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}

Expand Down Expand Up @@ -671,6 +685,7 @@ func (s *Service) UpdateAndDialTshdEventsServerAddress(serverAddress string) err
client := api.NewTshdEventsServiceClient(conn)

s.tshdEventsClient = client
s.importantModalSemaphore = newWaitSemaphore(maxConcurrentImportantModals, imporantModalWaitDuraiton)

// Resume headless watchers for any active login sessions.
if err := s.StartHeadlessWatchers(); err != nil {
Expand Down Expand Up @@ -818,6 +833,15 @@ type Service struct {
gateways map[string]gateway.Gateway
// tshdEventsClient is a client to send events to the Electron App.
tshdEventsClient api.TshdEventsServiceClient
// The Electron App can only display one important Modal at a time. tshd events
// that trigger an important modal (relogin, headless login) should use this
// lock to ensure it doesn't overwrite existing tshd-initiated important modals.
//
// We use a semaphore instead of a mutex in order to cancel important modals that
// are no longer relevant before acquisition.
//
// We use a waitSemaphore in order to make sure there is a clear transition between modals.
importantModalSemaphore *waitSemaphore
// usageReporter batches the events and sends them to prehog
usageReporter *usagereporter.UsageReporter
// reloginMu is used when a goroutine needs to request a relogin from the Electron app. Since the
Expand All @@ -834,3 +858,33 @@ type CreateGatewayParams struct {
TargetSubresourceName string
LocalPort string
}

// waitSemaphore is a semaphore that waits for a specified duration between acquisitions.
type waitSemaphore struct {
semC chan struct{}
lastRelease time.Time
waitDuration time.Duration
}

func newWaitSemaphore(maxConcurrency int, waitDuration time.Duration) *waitSemaphore {
return &waitSemaphore{
semC: make(chan struct{}, maxConcurrency),
waitDuration: waitDuration,
}
}

func (s *waitSemaphore) Acquire(ctx context.Context) error {
select {
case s.semC <- struct{}{}:
// wait up to the specified wait duration before returning.
time.Sleep(s.waitDuration - time.Since(s.lastRelease))
return nil
case <-ctx.Done():
return trace.Wrap(ctx.Err())
}
}

func (s *waitSemaphore) Release() {
s.lastRelease = time.Now()
<-s.semC
}
97 changes: 83 additions & 14 deletions lib/teleterm/daemon/daemon_headless.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package daemon

import (
"context"
"strings"
"sync"

"github.com/gravitational/trace"

Expand Down Expand Up @@ -100,17 +102,43 @@ func (s *Service) startHeadlessWatcher(cluster *clusters.Cluster) error {
watchCtx, watchCancel := context.WithCancel(s.closeContext)
s.headlessWatcherClosers[cluster.URI.String()] = watchCancel

log := s.cfg.Log.WithField("cluster", cluster.URI.String())

pendingRequests := make(map[string]context.CancelFunc)
pendingRequestsMu := sync.Mutex{}

cancelPendingRequest := func(name string) {
pendingRequestsMu.Lock()
defer pendingRequestsMu.Unlock()
if cancel, ok := pendingRequests[name]; ok {
cancel()
}
}

addPendingRequest := func(name string, cancel context.CancelFunc) {
pendingRequestsMu.Lock()
defer pendingRequestsMu.Unlock()
pendingRequests[name] = cancel
}

watch := func() error {
watcher, closeWatcher, err := cluster.WatchPendingHeadlessAuthentications(watchCtx)
pendingWatcher, closePendingWatcher, err := cluster.WatchPendingHeadlessAuthentications(watchCtx)
if err != nil {
return trace.Wrap(err)
}
defer closePendingWatcher()

resolutionWatcher, closeResolutionWatcher, err := cluster.WatchHeadlessAuthentications(watchCtx)
if err != nil {
return trace.Wrap(err)
}
defer closeResolutionWatcher()

retry.Reset()

defer closeWatcher()
for {
select {
case event := <-watcher.Events():
case event := <-pendingWatcher.Events():
// Ignore non-put events.
if event.Type != types.OpPut {
continue
Expand All @@ -121,25 +149,49 @@ func (s *Service) startHeadlessWatcher(cluster *clusters.Cluster) error {
return trace.Errorf("headless watcher returned an unexpected resource type %T", event.Resource)
}

// Notify the Electron App of the pending headless authentication to handle resolution.
req := &api.SendPendingHeadlessAuthenticationRequest{
RootClusterUri: cluster.URI.String(),
HeadlessAuthenticationId: ha.GetName(),
HeadlessAuthenticationClientIp: ha.ClientIpAddress,
}
// headless authentication requests will timeout after 3 minutes, so we can close the
// Electron modal once this time is up.
sendCtx, cancelSend := context.WithTimeout(s.closeContext, defaults.CallbackTimeout)

if _, err := s.tshdEventsClient.SendPendingHeadlessAuthentication(watchCtx, req); err != nil {
return trace.Wrap(err)
// Add the pending request to the map so it is canceled early upon resolution.
addPendingRequest(ha.GetName(), cancelSend)

// Notify the Electron App of the pending headless authentication to handle resolution.
// We do this in a goroutine so the watch loop can continue and cancel resolved requests.
go func() {
defer cancelSend()
if err := s.sendPendingHeadlessAuthentication(sendCtx, ha, cluster.URI.String()); err != nil {
if !strings.Contains(err.Error(), context.Canceled.Error()) && !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
log.WithError(err).Debug("sendPendingHeadlessAuthentication resulted in unexpected error.")
}
}
}()
case event := <-resolutionWatcher.Events():
// Watch for pending headless authentications to be approved, denied, or deleted (canceled/timeout).
switch event.Type {
case types.OpPut:
ha, ok := event.Resource.(*types.HeadlessAuthentication)
if !ok {
return trace.Errorf("headless watcher returned an unexpected resource type %T", event.Resource)
}

switch ha.State {
case types.HeadlessAuthenticationState_HEADLESS_AUTHENTICATION_STATE_APPROVED, types.HeadlessAuthenticationState_HEADLESS_AUTHENTICATION_STATE_DENIED:
cancelPendingRequest(ha.GetName())
}
case types.OpDelete:
cancelPendingRequest(event.Resource.GetName())
}
case <-watcher.Done():
return trace.Wrap(watcher.Error())
case <-pendingWatcher.Done():
return trace.Wrap(pendingWatcher.Error(), "pending watcher error")
case <-resolutionWatcher.Done():
return trace.Wrap(resolutionWatcher.Error(), "resolution watcher error")
case <-watchCtx.Done():
return nil
}
}
}

log := s.cfg.Log.WithField("cluster", cluster.URI.String())
log.Debugf("Starting headless watch loop.")
go func() {
defer func() {
Expand Down Expand Up @@ -185,6 +237,23 @@ func (s *Service) startHeadlessWatcher(cluster *clusters.Cluster) error {
return nil
}

// sendPendingHeadlessAuthentication notifies the Electron App of a pending headless authentication.
func (s *Service) sendPendingHeadlessAuthentication(ctx context.Context, ha *types.HeadlessAuthentication, clusterURI string) error {
req := &api.SendPendingHeadlessAuthenticationRequest{
RootClusterUri: clusterURI,
HeadlessAuthenticationId: ha.GetName(),
HeadlessAuthenticationClientIp: ha.ClientIpAddress,
}

if err := s.importantModalSemaphore.Acquire(ctx); err != nil {
return trace.Wrap(err)
}
defer s.importantModalSemaphore.Release()

_, err := s.tshdEventsClient.SendPendingHeadlessAuthentication(ctx, req)
return trace.Wrap(err)
}

// StopHeadlessWatcher stops the headless watcher for the given cluster URI.
func (s *Service) StopHeadlessWatcher(uri string) error {
s.headlessWatcherClosersMu.Lock()
Expand Down
Loading