From 6d9801423f745ab8815724fc3e879573ba1b415f Mon Sep 17 00:00:00 2001 From: Hugo Hervieux Date: Mon, 31 Jul 2023 16:05:07 -0400 Subject: [PATCH 1/3] integrations/access: avoid infinite retry on broken connection This commit changes the watcherjob retry behaviour. Instead of relying on gRPC's retry mechanism, the plugins will now fail fast when something happens to the connection. This means the plugin will exit in error more often, but it won't be stuck in a retry loop, silently smallowing connection errors. --- integrations/access/common/config.go | 3 --- integrations/lib/watcherjob/watcherjob.go | 22 ++++++++++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/integrations/access/common/config.go b/integrations/access/common/config.go index af28e8757de76..2d88fb100b448 100644 --- a/integrations/access/common/config.go +++ b/integrations/access/common/config.go @@ -69,9 +69,6 @@ func (c BaseConfig) GetTeleportClient(ctx context.Context) (teleport.Client, err Credentials: c.Teleport.Credentials(), DialOpts: []grpc.DialOption{ grpc.WithConnectParams(grpc.ConnectParams{Backoff: bk, MinConnectTimeout: initTimeout}), - grpc.WithDefaultCallOptions( - grpc.WaitForReady(true), - ), grpc.WithReturnConnectionError(), }, }) diff --git a/integrations/lib/watcherjob/watcherjob.go b/integrations/lib/watcherjob/watcherjob.go index aae66c49648ef..499f2d60dae84 100644 --- a/integrations/lib/watcherjob/watcherjob.go +++ b/integrations/lib/watcherjob/watcherjob.go @@ -21,10 +21,12 @@ import ( "time" "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/integrations/access/common/teleport" "github.com/gravitational/teleport/integrations/lib" + "github.com/gravitational/teleport/integrations/lib/backoff" "github.com/gravitational/teleport/integrations/lib/logger" ) @@ -83,13 +85,27 @@ func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.Serv return nil }) + bk := backoff.NewDecorr(20*time.Millisecond, 5*time.Second, clockwork.NewRealClock()) + log := logger.Get(ctx) for { err := job.watchEvents(ctx) + // We are not supporting liveness/readiness yet, but if we do it would make sense to use job's readiness + job.SetReady(false) + switch { case trace.IsConnectionProblem(err): +<<<<<<< HEAD log.WithError(err).Error("Failed to connect to Teleport Auth server. Reconnecting...") case errors.Is(err, io.EOF): +======= + // Not all connection problems can be retried. The client can + // end up in a broken state and won't be able to connect. + // Exiting in error is noisier but allows the orchestrator to + // know something is not right. + return trace.WrapWithMessage(err, "Failed to connect to Teleport server. Exiting.") + case trace.IsEOF(err): +>>>>>>> 5965bbe145 (integrations/access: avoid infinite retry on broken connection) log.WithError(err).Error("Watcher stream closed. Reconnecting...") case lib.IsCanceled(err): log.Debug("Watcher context is canceled") @@ -99,6 +115,12 @@ func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.Serv log.WithError(err).Error("Watcher event loop failed") return trace.Wrap(err) } + + // To mitigate a potentially aggressive retry loop, we wait + if err := bk.Do(ctx); err != nil { + log.Debug("Watcher context was canceled while waiting before a reconnection") + return nil + } } }) return job From f56a732fb8b42220d771580680f15f9f7112e9d2 Mon Sep 17 00:00:00 2001 From: Hugo Shaka Date: Tue, 8 Aug 2023 17:46:02 -0400 Subject: [PATCH 2/3] Add TELEPORT_PLUGIN_FAIL_FAST environment variable --- integrations/access/common/app.go | 5 ++- integrations/access/jira/app.go | 5 ++- integrations/access/pagerduty/app.go | 5 ++- integrations/lib/watcherjob/helpers_test.go | 4 +- integrations/lib/watcherjob/watcherjob.go | 42 +++++++++++++-------- 5 files changed, 41 insertions(+), 20 deletions(-) diff --git a/integrations/access/common/app.go b/integrations/access/common/app.go index be85eeb59e0fc..219b86c26d2ca 100644 --- a/integrations/access/common/app.go +++ b/integrations/access/common/app.go @@ -177,7 +177,7 @@ func (a *BaseApp) run(ctx context.Context) error { if err := a.init(ctx); err != nil { return trace.Wrap(err) } - watcherJob := watcherjob.NewJob( + watcherJob, err := watcherjob.NewJob( a.apiClient, watcherjob.Config{ Watch: types.Watch{Kinds: []types.WatchKind{{Kind: types.KindAccessRequest}}}, @@ -185,6 +185,9 @@ func (a *BaseApp) run(ctx context.Context) error { }, a.onWatcherEvent, ) + if err != nil { + return trace.Wrap(err) + } a.SpawnCriticalJob(watcherJob) ok, err := watcherJob.WaitReady(ctx) if err != nil { diff --git a/integrations/access/jira/app.go b/integrations/access/jira/app.go index 1f82b060b9a7e..905af395802e3 100644 --- a/integrations/access/jira/app.go +++ b/integrations/access/jira/app.go @@ -135,7 +135,7 @@ func (a *App) run(ctx context.Context) error { } } - watcherJob := watcherjob.NewJob( + watcherJob, err := watcherjob.NewJob( a.teleport, watcherjob.Config{ Watch: types.Watch{Kinds: []types.WatchKind{types.WatchKind{Kind: types.KindAccessRequest}}}, @@ -143,6 +143,9 @@ func (a *App) run(ctx context.Context) error { }, a.onWatcherEvent, ) + if err != nil { + return trace.Wrap(err) + } a.SpawnCriticalJob(watcherJob) watcherOk, err := watcherJob.WaitReady(ctx) if err != nil { diff --git a/integrations/access/pagerduty/app.go b/integrations/access/pagerduty/app.go index ccc42421afa65..2f7a59a919e0e 100644 --- a/integrations/access/pagerduty/app.go +++ b/integrations/access/pagerduty/app.go @@ -113,7 +113,7 @@ func (a *App) run(ctx context.Context) error { return trace.Wrap(err) } - watcherJob := watcherjob.NewJob( + watcherJob, err := watcherjob.NewJob( a.teleport, watcherjob.Config{ Watch: types.Watch{Kinds: []types.WatchKind{types.WatchKind{Kind: types.KindAccessRequest}}}, @@ -121,6 +121,9 @@ func (a *App) run(ctx context.Context) error { }, a.onWatcherEvent, ) + if err != nil { + return trace.Wrap(err) + } a.SpawnCriticalJob(watcherJob) ok, err := watcherJob.WaitReady(ctx) if err != nil { diff --git a/integrations/lib/watcherjob/helpers_test.go b/integrations/lib/watcherjob/helpers_test.go index 90ceb59a0bf7a..e32e433b415f9 100644 --- a/integrations/lib/watcherjob/helpers_test.go +++ b/integrations/lib/watcherjob/helpers_test.go @@ -119,7 +119,9 @@ func NewMockEventsProcess(ctx context.Context, t *testing.T, config Config, fn E assert.NoError(t, process.Shutdown(ctx)) process.Close() }) - process.eventsJob = NewJobWithEvents(&process.Events, config, fn) + var err error + process.eventsJob, err = NewJobWithEvents(&process.Events, config, fn) + require.NoError(t, err) process.SpawnCriticalJob(process.eventsJob) require.NoError(t, process.Events.WaitSomeWatchers(ctx)) process.Events.Fire(types.Event{Type: types.OpInit}) diff --git a/integrations/lib/watcherjob/watcherjob.go b/integrations/lib/watcherjob/watcherjob.go index 499f2d60dae84..7873a3372e269 100644 --- a/integrations/lib/watcherjob/watcherjob.go +++ b/integrations/lib/watcherjob/watcherjob.go @@ -18,6 +18,8 @@ import ( "context" "errors" "io" + "os" + "strconv" "time" "github.com/gravitational/trace" @@ -30,8 +32,11 @@ import ( "github.com/gravitational/teleport/integrations/lib/logger" ) -const DefaultMaxConcurrency = 128 -const DefaultEventFuncTimeout = time.Second * 5 +const ( + DefaultMaxConcurrency = 128 + DefaultEventFuncTimeout = time.Second * 5 + failFastEnvVarName = "TELEPORT_PLUGIN_FAIL_FAST" +) type EventFunc func(context.Context, types.Event) error @@ -39,6 +44,7 @@ type Config struct { Watch types.Watch MaxConcurrency int EventFuncTimeout time.Duration + FailFast bool } type job struct { @@ -54,17 +60,24 @@ type eventKey struct { name string } -func NewJob(client teleport.Client, config Config, fn EventFunc) lib.ServiceJob { +func NewJob(client teleport.Client, config Config, fn EventFunc) (lib.ServiceJob, error) { return NewJobWithEvents(client, config, fn) } -func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.ServiceJob { +func NewJobWithEvents(events types.Events, config Config, fn EventFunc) (lib.ServiceJob, error) { if config.MaxConcurrency == 0 { config.MaxConcurrency = DefaultMaxConcurrency } if config.EventFuncTimeout == 0 { config.EventFuncTimeout = DefaultEventFuncTimeout } + if flagVar := os.Getenv(failFastEnvVarName); !config.FailFast && flagVar != "" { + flag, err := strconv.ParseBool(flagVar) + if err != nil { + return nil, trace.WrapWithMessage(err, "failed to parse content '%s' of the %s environment variable", flagVar, failFastEnvVarName) + } + config.FailFast = flag + } job := job{ events: events, config: config, @@ -95,18 +108,15 @@ func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.Serv switch { case trace.IsConnectionProblem(err): -<<<<<<< HEAD - log.WithError(err).Error("Failed to connect to Teleport Auth server. Reconnecting...") + if config.FailFast { + return trace.WrapWithMessage(err, "Connection problem detected. Exiting as fail fast is on.") + } + log.WithError(err).Error("Connection problem detected. Attempting to reconnect.") case errors.Is(err, io.EOF): -======= - // Not all connection problems can be retried. The client can - // end up in a broken state and won't be able to connect. - // Exiting in error is noisier but allows the orchestrator to - // know something is not right. - return trace.WrapWithMessage(err, "Failed to connect to Teleport server. Exiting.") - case trace.IsEOF(err): ->>>>>>> 5965bbe145 (integrations/access: avoid infinite retry on broken connection) - log.WithError(err).Error("Watcher stream closed. Reconnecting...") + if config.FailFast { + return trace.WrapWithMessage(err, "Watcher stream closed. Exiting as fail fast is on.") + } + log.WithError(err).Error("Watcher stream closed. Attempting to reconnect.") case lib.IsCanceled(err): log.Debug("Watcher context is canceled") // Context cancellation is not an error @@ -123,7 +133,7 @@ func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.Serv } } }) - return job + return job, nil } // watchEvents spawns a watcher and reads events from it. From f409af0856f3aaf78bbb48d0d3664fda5b4ac0cd Mon Sep 17 00:00:00 2001 From: Hugo Shaka Date: Wed, 9 Aug 2023 10:35:25 -0400 Subject: [PATCH 3/3] fixup! Add TELEPORT_PLUGIN_FAIL_FAST environment variable --- integrations/access/opsgenie/app.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/integrations/access/opsgenie/app.go b/integrations/access/opsgenie/app.go index edbefa1dfe109..7395a7dbba3d9 100644 --- a/integrations/access/opsgenie/app.go +++ b/integrations/access/opsgenie/app.go @@ -115,7 +115,7 @@ func (a *App) run(ctx context.Context) error { return trace.Wrap(err) } - watcherJob := watcherjob.NewJob( + watcherJob, err := watcherjob.NewJob( a.teleport, watcherjob.Config{ Watch: types.Watch{Kinds: []types.WatchKind{types.WatchKind{Kind: types.KindAccessRequest}}}, @@ -123,6 +123,9 @@ func (a *App) run(ctx context.Context) error { }, a.onWatcherEvent, ) + if err != nil { + return trace.Wrap(err) + } a.SpawnCriticalJob(watcherJob) ok, err := watcherJob.WaitReady(ctx) if err != nil {