diff --git a/integrations/access/common/app.go b/integrations/access/common/app.go index 0d1c5c8c42f48..100f066501d7b 100644 --- a/integrations/access/common/app.go +++ b/integrations/access/common/app.go @@ -177,7 +177,7 @@ func (a *BaseApp) run(ctx context.Context) error { if err := a.init(ctx); err != nil { return trace.Wrap(err) } - watcherJob := watcherjob.NewJob( + watcherJob, err := watcherjob.NewJob( a.apiClient, watcherjob.Config{ Watch: types.Watch{Kinds: []types.WatchKind{{Kind: types.KindAccessRequest}}}, @@ -185,6 +185,9 @@ func (a *BaseApp) run(ctx context.Context) error { }, a.onWatcherEvent, ) + if err != nil { + return trace.Wrap(err) + } a.SpawnCriticalJob(watcherJob) ok, err := watcherJob.WaitReady(ctx) if err != nil { diff --git a/integrations/access/common/config.go b/integrations/access/common/config.go index af28e8757de76..2d88fb100b448 100644 --- a/integrations/access/common/config.go +++ b/integrations/access/common/config.go @@ -69,9 +69,6 @@ func (c BaseConfig) GetTeleportClient(ctx context.Context) (teleport.Client, err Credentials: c.Teleport.Credentials(), DialOpts: []grpc.DialOption{ grpc.WithConnectParams(grpc.ConnectParams{Backoff: bk, MinConnectTimeout: initTimeout}), - grpc.WithDefaultCallOptions( - grpc.WaitForReady(true), - ), grpc.WithReturnConnectionError(), }, }) diff --git a/integrations/access/jira/app.go b/integrations/access/jira/app.go index 1f82b060b9a7e..905af395802e3 100644 --- a/integrations/access/jira/app.go +++ b/integrations/access/jira/app.go @@ -135,7 +135,7 @@ func (a *App) run(ctx context.Context) error { } } - watcherJob := watcherjob.NewJob( + watcherJob, err := watcherjob.NewJob( a.teleport, watcherjob.Config{ Watch: types.Watch{Kinds: []types.WatchKind{types.WatchKind{Kind: types.KindAccessRequest}}}, @@ -143,6 +143,9 @@ func (a *App) run(ctx context.Context) error { }, a.onWatcherEvent, ) + if err != nil { + return trace.Wrap(err) + } a.SpawnCriticalJob(watcherJob) watcherOk, err := watcherJob.WaitReady(ctx) if err != nil { diff --git a/integrations/access/opsgenie/app.go b/integrations/access/opsgenie/app.go index edbefa1dfe109..7395a7dbba3d9 100644 --- a/integrations/access/opsgenie/app.go +++ b/integrations/access/opsgenie/app.go @@ -115,7 +115,7 @@ func (a *App) run(ctx context.Context) error { return trace.Wrap(err) } - watcherJob := watcherjob.NewJob( + watcherJob, err := watcherjob.NewJob( a.teleport, watcherjob.Config{ Watch: types.Watch{Kinds: []types.WatchKind{types.WatchKind{Kind: types.KindAccessRequest}}}, @@ -123,6 +123,9 @@ func (a *App) run(ctx context.Context) error { }, a.onWatcherEvent, ) + if err != nil { + return trace.Wrap(err) + } a.SpawnCriticalJob(watcherJob) ok, err := watcherJob.WaitReady(ctx) if err != nil { diff --git a/integrations/access/pagerduty/app.go b/integrations/access/pagerduty/app.go index ccc42421afa65..2f7a59a919e0e 100644 --- a/integrations/access/pagerduty/app.go +++ b/integrations/access/pagerduty/app.go @@ -113,7 +113,7 @@ func (a *App) run(ctx context.Context) error { return trace.Wrap(err) } - watcherJob := watcherjob.NewJob( + watcherJob, err := watcherjob.NewJob( a.teleport, watcherjob.Config{ Watch: types.Watch{Kinds: []types.WatchKind{types.WatchKind{Kind: types.KindAccessRequest}}}, @@ -121,6 +121,9 @@ func (a *App) run(ctx context.Context) error { }, a.onWatcherEvent, ) + if err != nil { + return trace.Wrap(err) + } a.SpawnCriticalJob(watcherJob) ok, err := watcherJob.WaitReady(ctx) if err != nil { diff --git a/integrations/lib/watcherjob/helpers_test.go b/integrations/lib/watcherjob/helpers_test.go index 90ceb59a0bf7a..e32e433b415f9 100644 --- a/integrations/lib/watcherjob/helpers_test.go +++ b/integrations/lib/watcherjob/helpers_test.go @@ -119,7 +119,9 @@ func NewMockEventsProcess(ctx context.Context, t *testing.T, config Config, fn E assert.NoError(t, process.Shutdown(ctx)) process.Close() }) - process.eventsJob = NewJobWithEvents(&process.Events, config, fn) + var err error + process.eventsJob, err = NewJobWithEvents(&process.Events, config, fn) + require.NoError(t, err) process.SpawnCriticalJob(process.eventsJob) require.NoError(t, process.Events.WaitSomeWatchers(ctx)) process.Events.Fire(types.Event{Type: types.OpInit}) diff --git a/integrations/lib/watcherjob/watcherjob.go b/integrations/lib/watcherjob/watcherjob.go index aae66c49648ef..7873a3372e269 100644 --- a/integrations/lib/watcherjob/watcherjob.go +++ b/integrations/lib/watcherjob/watcherjob.go @@ -18,18 +18,25 @@ import ( "context" "errors" "io" + "os" + "strconv" "time" "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/integrations/access/common/teleport" "github.com/gravitational/teleport/integrations/lib" + "github.com/gravitational/teleport/integrations/lib/backoff" "github.com/gravitational/teleport/integrations/lib/logger" ) -const DefaultMaxConcurrency = 128 -const DefaultEventFuncTimeout = time.Second * 5 +const ( + DefaultMaxConcurrency = 128 + DefaultEventFuncTimeout = time.Second * 5 + failFastEnvVarName = "TELEPORT_PLUGIN_FAIL_FAST" +) type EventFunc func(context.Context, types.Event) error @@ -37,6 +44,7 @@ type Config struct { Watch types.Watch MaxConcurrency int EventFuncTimeout time.Duration + FailFast bool } type job struct { @@ -52,17 +60,24 @@ type eventKey struct { name string } -func NewJob(client teleport.Client, config Config, fn EventFunc) lib.ServiceJob { +func NewJob(client teleport.Client, config Config, fn EventFunc) (lib.ServiceJob, error) { return NewJobWithEvents(client, config, fn) } -func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.ServiceJob { +func NewJobWithEvents(events types.Events, config Config, fn EventFunc) (lib.ServiceJob, error) { if config.MaxConcurrency == 0 { config.MaxConcurrency = DefaultMaxConcurrency } if config.EventFuncTimeout == 0 { config.EventFuncTimeout = DefaultEventFuncTimeout } + if flagVar := os.Getenv(failFastEnvVarName); !config.FailFast && flagVar != "" { + flag, err := strconv.ParseBool(flagVar) + if err != nil { + return nil, trace.WrapWithMessage(err, "failed to parse content '%s' of the %s environment variable", flagVar, failFastEnvVarName) + } + config.FailFast = flag + } job := job{ events: events, config: config, @@ -83,14 +98,25 @@ func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.Serv return nil }) + bk := backoff.NewDecorr(20*time.Millisecond, 5*time.Second, clockwork.NewRealClock()) + log := logger.Get(ctx) for { err := job.watchEvents(ctx) + // We are not supporting liveness/readiness yet, but if we do it would make sense to use job's readiness + job.SetReady(false) + switch { case trace.IsConnectionProblem(err): - log.WithError(err).Error("Failed to connect to Teleport Auth server. Reconnecting...") + if config.FailFast { + return trace.WrapWithMessage(err, "Connection problem detected. Exiting as fail fast is on.") + } + log.WithError(err).Error("Connection problem detected. Attempting to reconnect.") case errors.Is(err, io.EOF): - log.WithError(err).Error("Watcher stream closed. Reconnecting...") + if config.FailFast { + return trace.WrapWithMessage(err, "Watcher stream closed. Exiting as fail fast is on.") + } + log.WithError(err).Error("Watcher stream closed. Attempting to reconnect.") case lib.IsCanceled(err): log.Debug("Watcher context is canceled") // Context cancellation is not an error @@ -99,9 +125,15 @@ func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.Serv log.WithError(err).Error("Watcher event loop failed") return trace.Wrap(err) } + + // To mitigate a potentially aggressive retry loop, we wait + if err := bk.Do(ctx); err != nil { + log.Debug("Watcher context was canceled while waiting before a reconnection") + return nil + } } }) - return job + return job, nil } // watchEvents spawns a watcher and reads events from it.