Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion integrations/access/common/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,17 @@ func (a *BaseApp) run(ctx context.Context) error {
if err := a.init(ctx); err != nil {
return trace.Wrap(err)
}
watcherJob := watcherjob.NewJob(
watcherJob, err := watcherjob.NewJob(
a.apiClient,
watcherjob.Config{
Watch: types.Watch{Kinds: []types.WatchKind{{Kind: types.KindAccessRequest}}},
EventFuncTimeout: handlerTimeout,
},
a.onWatcherEvent,
)
if err != nil {
return trace.Wrap(err)
}
a.SpawnCriticalJob(watcherJob)
ok, err := watcherJob.WaitReady(ctx)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions integrations/access/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ func (c BaseConfig) GetTeleportClient(ctx context.Context) (teleport.Client, err
Credentials: c.Teleport.Credentials(),
DialOpts: []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bk, MinConnectTimeout: initTimeout}),
grpc.WithDefaultCallOptions(
grpc.WaitForReady(true),
),
grpc.WithReturnConnectionError(),
},
})
Expand Down
5 changes: 4 additions & 1 deletion integrations/access/jira/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,17 @@ func (a *App) run(ctx context.Context) error {
}
}

watcherJob := watcherjob.NewJob(
watcherJob, err := watcherjob.NewJob(
a.teleport,
watcherjob.Config{
Watch: types.Watch{Kinds: []types.WatchKind{types.WatchKind{Kind: types.KindAccessRequest}}},
EventFuncTimeout: handlerTimeout,
},
a.onWatcherEvent,
)
if err != nil {
return trace.Wrap(err)
}
a.SpawnCriticalJob(watcherJob)
watcherOk, err := watcherJob.WaitReady(ctx)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion integrations/access/opsgenie/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,17 @@ func (a *App) run(ctx context.Context) error {
return trace.Wrap(err)
}

watcherJob := watcherjob.NewJob(
watcherJob, err := watcherjob.NewJob(
a.teleport,
watcherjob.Config{
Watch: types.Watch{Kinds: []types.WatchKind{types.WatchKind{Kind: types.KindAccessRequest}}},
EventFuncTimeout: handlerTimeout,
},
a.onWatcherEvent,
)
if err != nil {
return trace.Wrap(err)
}
a.SpawnCriticalJob(watcherJob)
ok, err := watcherJob.WaitReady(ctx)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion integrations/access/pagerduty/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,17 @@ func (a *App) run(ctx context.Context) error {
return trace.Wrap(err)
}

watcherJob := watcherjob.NewJob(
watcherJob, err := watcherjob.NewJob(
a.teleport,
watcherjob.Config{
Watch: types.Watch{Kinds: []types.WatchKind{types.WatchKind{Kind: types.KindAccessRequest}}},
EventFuncTimeout: handlerTimeout,
},
a.onWatcherEvent,
)
if err != nil {
return trace.Wrap(err)
}
a.SpawnCriticalJob(watcherJob)
ok, err := watcherJob.WaitReady(ctx)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion integrations/lib/watcherjob/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ func NewMockEventsProcess(ctx context.Context, t *testing.T, config Config, fn E
assert.NoError(t, process.Shutdown(ctx))
process.Close()
})
process.eventsJob = NewJobWithEvents(&process.Events, config, fn)
var err error
process.eventsJob, err = NewJobWithEvents(&process.Events, config, fn)
require.NoError(t, err)
process.SpawnCriticalJob(process.eventsJob)
require.NoError(t, process.Events.WaitSomeWatchers(ctx))
process.Events.Fire(types.Event{Type: types.OpInit})
Expand Down
46 changes: 39 additions & 7 deletions integrations/lib/watcherjob/watcherjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,33 @@ import (
"context"
"errors"
"io"
"os"
"strconv"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/integrations/access/common/teleport"
"github.com/gravitational/teleport/integrations/lib"
"github.com/gravitational/teleport/integrations/lib/backoff"
"github.com/gravitational/teleport/integrations/lib/logger"
)

const DefaultMaxConcurrency = 128
const DefaultEventFuncTimeout = time.Second * 5
const (
DefaultMaxConcurrency = 128
DefaultEventFuncTimeout = time.Second * 5
failFastEnvVarName = "TELEPORT_PLUGIN_FAIL_FAST"
)

type EventFunc func(context.Context, types.Event) error

type Config struct {
Watch types.Watch
MaxConcurrency int
EventFuncTimeout time.Duration
FailFast bool
}

type job struct {
Expand All @@ -52,17 +60,24 @@ type eventKey struct {
name string
}

func NewJob(client teleport.Client, config Config, fn EventFunc) lib.ServiceJob {
func NewJob(client teleport.Client, config Config, fn EventFunc) (lib.ServiceJob, error) {
return NewJobWithEvents(client, config, fn)
}

func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.ServiceJob {
func NewJobWithEvents(events types.Events, config Config, fn EventFunc) (lib.ServiceJob, error) {
if config.MaxConcurrency == 0 {
config.MaxConcurrency = DefaultMaxConcurrency
}
if config.EventFuncTimeout == 0 {
config.EventFuncTimeout = DefaultEventFuncTimeout
}
if flagVar := os.Getenv(failFastEnvVarName); !config.FailFast && flagVar != "" {
flag, err := strconv.ParseBool(flagVar)
if err != nil {
return nil, trace.WrapWithMessage(err, "failed to parse content '%s' of the %s environment variable", flagVar, failFastEnvVarName)
}
config.FailFast = flag
}
job := job{
events: events,
config: config,
Expand All @@ -83,14 +98,25 @@ func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.Serv
return nil
})

bk := backoff.NewDecorr(20*time.Millisecond, 5*time.Second, clockwork.NewRealClock())

log := logger.Get(ctx)
for {
err := job.watchEvents(ctx)
// We are not supporting liveness/readiness yet, but if we do it would make sense to use job's readiness
job.SetReady(false)
Comment thread
marcoandredinis marked this conversation as resolved.
Outdated

switch {
case trace.IsConnectionProblem(err):
log.WithError(err).Error("Failed to connect to Teleport Auth server. Reconnecting...")
if config.FailFast {
return trace.WrapWithMessage(err, "Connection problem detected. Exiting as fail fast is on.")
}
log.WithError(err).Error("Connection problem detected. Attempting to reconnect.")
case errors.Is(err, io.EOF):
log.WithError(err).Error("Watcher stream closed. Reconnecting...")
if config.FailFast {
return trace.WrapWithMessage(err, "Watcher stream closed. Exiting as fail fast is on.")
}
log.WithError(err).Error("Watcher stream closed. Attempting to reconnect.")
case lib.IsCanceled(err):
log.Debug("Watcher context is canceled")
// Context cancellation is not an error
Expand All @@ -99,9 +125,15 @@ func NewJobWithEvents(events types.Events, config Config, fn EventFunc) lib.Serv
log.WithError(err).Error("Watcher event loop failed")
return trace.Wrap(err)
}

// To mitigate a potentially aggressive retry loop, we wait
if err := bk.Do(ctx); err != nil {
log.Debug("Watcher context was canceled while waiting before a reconnection")
return nil
}
}
})
return job
return job, nil
}

// watchEvents spawns a watcher and reads events from it.
Expand Down