Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 85 additions & 32 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"golang.org/x/time/rate"
Comment thread
zmb3 marked this conversation as resolved.

"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
Expand Down Expand Up @@ -135,6 +136,8 @@ type controllerOptions struct {
authID string
onConnectFunc func(string)
onDisconnectFunc func(string, int)
cleanupLimiter *rate.Limiter
cleanupTimeout time.Duration
clock clockwork.Clock
}

Expand Down Expand Up @@ -164,6 +167,16 @@ func (options *controllerOptions) SetDefaults() {
options.onDisconnectFunc = func(string, int) {}
}

if options.cleanupLimiter == nil {
// limit resource cleanup writes to 128 per second to reduce the chances that
// agents with very large resource counts cause throttling on graceful disconnect.
options.cleanupLimiter = rate.NewLimiter(rate.Every(time.Second), 128)
}

if options.cleanupTimeout == 0 {
options.cleanupTimeout = time.Second * 30
}

if options.clock == nil {
options.clock = clockwork.NewRealClock()
}
Expand Down Expand Up @@ -245,6 +258,8 @@ type Controller struct {
testEvents chan testEvent
onConnectFunc func(string)
onDisconnectFunc func(string, int)
cleanupLimiter *rate.Limiter
cleanupTimeout time.Duration
clock clockwork.Clock
closeContext context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -317,6 +332,8 @@ func NewController(auth Auth, usageReporter usagereporter.UsageReporter, opts ..
usageReporter: usageReporter,
onConnectFunc: options.onConnectFunc,
onDisconnectFunc: options.onDisconnectFunc,
cleanupLimiter: options.cleanupLimiter,
cleanupTimeout: options.cleanupTimeout,
clock: options.clock,
closeContext: ctx,
cancel: cancel,
Expand Down Expand Up @@ -416,38 +433,7 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {

defer func() {
if handle.goodbye.GetDeleteResources() {
slog.DebugContext(c.closeContext, "Cleaning up resources in response to instance termination",
"apps", len(handle.appServers),
"dbs", len(handle.databaseServers),
"kube", len(handle.kubernetesServers),
"server_id", handle.Hello().ServerID,
)
for _, app := range handle.appServers {
if err := c.auth.DeleteApplicationServer(c.closeContext, apidefaults.Namespace, app.resource.GetHostID(), app.resource.GetName()); err != nil && !trace.IsNotFound(err) {
slog.WarnContext(c.closeContext, "Failed to remove app server on termination",
"app_server", handle.Hello().ServerID,
"error", err,
)
}
}

for _, db := range handle.databaseServers {
if err := c.auth.DeleteDatabaseServer(c.closeContext, apidefaults.Namespace, db.resource.GetHostID(), db.resource.GetName()); err != nil && !trace.IsNotFound(err) {
slog.WarnContext(c.closeContext, "Failed to remove db server on termination",
"db_server", handle.Hello().ServerID,
"error", err,
)
}
}

for _, kube := range handle.kubernetesServers {
if err := c.auth.DeleteKubernetesServer(c.closeContext, kube.resource.GetHostID(), kube.resource.GetName()); err != nil && !trace.IsNotFound(err) {
slog.WarnContext(c.closeContext, "Failed to remove kube server on termination",
"kube_server", handle.Hello().ServerID,
"error", err,
)
}
}
c.doResourceCleanup(handle)
}

c.instanceHBVariableDuration.Dec()
Expand Down Expand Up @@ -652,6 +638,73 @@ func variableRateHeartbeatsDisabledEnv() bool {
return os.Getenv("TELEPORT_UNSTABLE_DISABLE_VARIABLE_RATE_HEARTBEATS") == "yes"
}

// doResourceCleanup handles deletion of resources in response to a goodbye message. cleanup may only
// be partially applied if too much concurrent cleanup is ongoing and/or if cleanup is taking too long.
func (c *Controller) doResourceCleanup(handle *upstreamHandle) {
cleanupCtx, cancel := context.WithTimeout(c.closeContext, c.cleanupTimeout)
defer cancel()
slog.DebugContext(c.closeContext, "Cleaning up resources in response to instance termination",
"apps", len(handle.appServers),
"dbs", len(handle.databaseServers),
"kube", len(handle.kubernetesServers),
"server_id", handle.Hello().ServerID,
)
for _, app := range handle.appServers {
if err := c.cleanupLimiter.Wait(cleanupCtx); err != nil {
slog.WarnContext(c.closeContext, "halting remaining resource cleanup", "instance_id", handle.Hello().ServerID, "error", err)
return
}

if err := c.auth.DeleteApplicationServer(cleanupCtx, apidefaults.Namespace, app.resource.GetHostID(), app.resource.GetName()); err != nil && !trace.IsNotFound(err) {
if cleanupCtx.Err() != nil {
slog.WarnContext(c.closeContext, "halting remaining resource cleanup", "instance_id", handle.Hello().ServerID, "error", err)
return
}
slog.WarnContext(c.closeContext, "Failed to remove app server on termination",
"app_server", handle.Hello().ServerID,
"error", err,
)
}
}

for _, db := range handle.databaseServers {
if err := c.cleanupLimiter.Wait(cleanupCtx); err != nil {
slog.WarnContext(c.closeContext, "halting remaining resource cleanup", "instance_id", handle.Hello().ServerID, "error", err)
return
}

if err := c.auth.DeleteDatabaseServer(cleanupCtx, apidefaults.Namespace, db.resource.GetHostID(), db.resource.GetName()); err != nil && !trace.IsNotFound(err) {
if cleanupCtx.Err() != nil {
slog.WarnContext(c.closeContext, "halting remaining resource cleanup", "instance_id", handle.Hello().ServerID, "error", err)
return
}
slog.WarnContext(c.closeContext, "Failed to remove db server on termination",
"db_server", handle.Hello().ServerID,
"error", err,
)
}
}

for _, kube := range handle.kubernetesServers {
if err := c.cleanupLimiter.Wait(cleanupCtx); err != nil {
slog.WarnContext(c.closeContext, "halting remaining resource cleanup", "instance_id", handle.Hello().ServerID, "error", err)
return
}

if err := c.auth.DeleteKubernetesServer(c.closeContext, kube.resource.GetHostID(), kube.resource.GetName()); err != nil && !trace.IsNotFound(err) {
if cleanupCtx.Err() != nil {
slog.WarnContext(c.closeContext, "halting remaining resource cleanup", "instance_id", handle.Hello().ServerID, "error", err)
return
}

slog.WarnContext(c.closeContext, "Failed to remove kube server on termination",
"kube_server", handle.Hello().ServerID,
"error", err,
)
}
}
}

func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Time) error {
if !c.instanceHBEnabled {
return nil
Expand Down