Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 92 additions & 27 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package inventory

import (
"context"
"log/slog"
"math/rand/v2"
"os"
"strings"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"

"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
Expand All @@ -39,6 +39,7 @@ import (
usagereporter "github.com/gravitational/teleport/lib/usagereporter/teleport"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/interval"
logutils "github.com/gravitational/teleport/lib/utils/log"
)

// Auth is an interface representing the subset of the auth API that must be made available
Expand Down Expand Up @@ -373,27 +374,36 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {

defer func() {
if handle.goodbye.GetDeleteResources() {
log.WithFields(log.Fields{
"apps": len(handle.appServers),
"dbs": len(handle.databaseServers),
"kube": len(handle.kubernetesServers),
"server_id": handle.Hello().ServerID,
}).Debug("Cleaning up resources in response to instance termination")
slog.DebugContext(c.closeContext, "Cleaning up resources in response to instance termination",
"apps", len(handle.appServers),
"dbs", len(handle.databaseServers),
"kube", len(handle.kubernetesServers),
"server_id", handle.Hello().ServerID,
)
for _, app := range handle.appServers {
if err := c.auth.DeleteApplicationServer(c.closeContext, apidefaults.Namespace, app.resource.GetHostID(), app.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove app server %q on termination: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to remove app server on termination",
"app_server", handle.Hello().ServerID,
"error", err,
)
}
}

for _, db := range handle.databaseServers {
if err := c.auth.DeleteDatabaseServer(c.closeContext, apidefaults.Namespace, db.resource.GetHostID(), db.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove db server %q on termination: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to remove db server on termination",
"db_server", handle.Hello().ServerID,
"error", err,
)
}
}

for _, kube := range handle.kubernetesServers {
if err := c.auth.DeleteKubernetesServer(c.closeContext, kube.resource.GetHostID(), kube.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove kube server %q on termination: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to remove kube server on termination",
"kube_server", handle.Hello().ServerID,
"error", err,
)
}
}
}
Expand Down Expand Up @@ -432,7 +442,7 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
case msg := <-handle.Recv():
switch m := msg.(type) {
case proto.UpstreamInventoryHello:
log.Warnf("Unexpected upstream hello on control stream of server %q.", handle.Hello().ServerID)
slog.WarnContext(c.closeContext, "Unexpected upstream hello on control stream of server", "server_id", handle.Hello().ServerID)
handle.CloseWithError(trace.BadParameter("unexpected upstream hello"))
return
case proto.UpstreamInventoryAgentMetadata:
Expand Down Expand Up @@ -477,7 +487,10 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
case proto.UpstreamInventoryGoodbye:
handle.goodbye = m
default:
log.Warnf("Unexpected upstream message type %T on control stream of server %q.", m, handle.Hello().ServerID)
slog.WarnContext(c.closeContext, "Unexpected upstream message type on control stream",
"message_type", logutils.TypeAttr(m),
"server_id", handle.Hello().ServerID,
)
handle.CloseWithError(trace.BadParameter("unexpected upstream message type %T", m))
return
}
Expand Down Expand Up @@ -579,7 +592,10 @@ func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Tim

instance, err := tracker.nextHeartbeat(now, handle.Hello(), c.authID)
if err != nil {
log.Warnf("Failed to construct next heartbeat value for instance %q: %v (this is a bug)", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to construct next heartbeat value for instance (this is a bug)",
"server_id", handle.Hello().ServerID,
"error", err,
)
return trace.Wrap(err)
}

Expand All @@ -592,7 +608,10 @@ func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Tim
})

if err != nil {
log.Warnf("Failed to hb instance %q: %v", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to hb instance",
"server_id", handle.Hello().ServerID,
"error", err,
)
c.testEvent(instanceHeartbeatErr)
if !tracker.retryHeartbeat {
// suppress failure and retry exactly once
Expand All @@ -614,7 +633,9 @@ func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Tim
func (c *Controller) handlePong(handle *upstreamHandle, msg proto.UpstreamInventoryPong) {
pending, ok := handle.pings[msg.ID]
if !ok {
log.Warnf("Unexpected upstream pong from server %q (id=%d).", handle.Hello().ServerID, msg.ID)
slog.WarnContext(c.closeContext, "Unexpected upstream pong",
"server_id", handle.Hello().ServerID,
"pong_id", msg.ID)
return
}
now := c.clock.Now()
Expand Down Expand Up @@ -718,7 +739,10 @@ func (c *Controller) handleSSHServerHB(handle *upstreamHandle, sshServer *types.
handle.sshServer.retryUpsert = false
} else {
c.testEvent(sshUpsertErr)
log.Warnf("Failed to upsert ssh server %q on heartbeat: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert ssh server on heartbeat",
"server_id", handle.Hello().ServerID,
"error", err,
)

// blank old lease if any and set retry state. next time handleKeepAlive is called
// we will attempt to upsert the server again.
Expand Down Expand Up @@ -765,7 +789,10 @@ func (c *Controller) handleAppServerHB(handle *upstreamHandle, appServer *types.
srv.resource = appServer
} else {
c.testEvent(appUpsertErr)
log.Warnf("Failed to upsert app server %q on heartbeat: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert app server on heartbeat",
"server_id", handle.Hello().ServerID,
"error", err,
)

// blank old lease if any and set retry state. next time handleKeepAlive is called
// we will attempt to upsert the server again.
Expand Down Expand Up @@ -813,7 +840,10 @@ func (c *Controller) handleDatabaseServerHB(handle *upstreamHandle, databaseServ
srv.resource = databaseServer
} else {
c.testEvent(dbUpsertErr)
log.Warnf("Failed to upsert database server %q on heartbeat: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert database server on heartbeat",
"server_id", handle.Hello().ServerID,
"error", err,
)

// blank old lease if any and set retry state. next time handleKeepAlive is called
// we will attempt to upsert the server again.
Expand Down Expand Up @@ -861,7 +891,10 @@ func (c *Controller) handleKubernetesServerHB(handle *upstreamHandle, kubernetes
srv.resource = kubernetesServer
} else {
c.testEvent(kubeUpsertErr)
log.Warnf("Failed to upsert kubernetes server %q on heartbeat: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert kubernetes server on heartbeat",
"server_id", handle.Hello().ServerID,
"error", err,
)

// blank old lease if any and set retry state. next time handleKeepAlive is called
// we will attempt to upsert the server again.
Expand Down Expand Up @@ -908,7 +941,12 @@ func (c *Controller) keepAliveAppServer(handle *upstreamHandle, now time.Time) e
srv.keepAliveErrs++
handle.appServers[name] = srv
shouldRemove := srv.keepAliveErrs > c.maxKeepAliveErrs
log.Warnf("Failed to keep alive app server %q: %v (count=%d, removing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldRemove)
slog.WarnContext(c.closeContext, "Failed to keep alive app server",
"server_id", handle.Hello().ServerID,
"error", err,
"error_count", srv.keepAliveErrs,
"should_remove", shouldRemove,
)

if shouldRemove {
c.testEvent(appKeepAliveDel)
Expand All @@ -924,7 +962,10 @@ func (c *Controller) keepAliveAppServer(handle *upstreamHandle, now time.Time) e
lease, err := c.auth.UpsertApplicationServer(c.closeContext, srv.resource)
if err != nil {
c.testEvent(appUpsertRetryErr)
log.Warnf("Failed to upsert app server %q on retry: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert app server on retry",
"server_id", handle.Hello().ServerID,
"error", err,
)
// since this is retry-specific logic, an error here means that upsert failed twice in
// a row. Missing upserts is more problematic than missing keepalives so we don't bother
// attempting a third time.
Expand All @@ -951,7 +992,12 @@ func (c *Controller) keepAliveDatabaseServer(handle *upstreamHandle, now time.Ti
srv.keepAliveErrs++
handle.databaseServers[name] = srv
shouldRemove := srv.keepAliveErrs > c.maxKeepAliveErrs
log.Warnf("Failed to keep alive database server %q: %v (count=%d, removing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldRemove)
slog.WarnContext(c.closeContext, "Failed to keep alive database server",
"server_id", handle.Hello().ServerID,
"error", err,
"error_count", srv.keepAliveErrs,
"should_remove", shouldRemove,
)

if shouldRemove {
c.testEvent(dbKeepAliveDel)
Expand All @@ -967,7 +1013,10 @@ func (c *Controller) keepAliveDatabaseServer(handle *upstreamHandle, now time.Ti
lease, err := c.auth.UpsertDatabaseServer(c.closeContext, srv.resource)
if err != nil {
c.testEvent(dbUpsertRetryErr)
log.Warnf("Failed to upsert database server %q on retry: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert database server on retry",
"server_id", handle.Hello().ServerID,
"error", err,
)
// since this is retry-specific logic, an error here means that upsert failed twice in
// a row. Missing upserts is more problematic than missing keepalives so we don't bother
// attempting a third time.
Expand All @@ -994,7 +1043,12 @@ func (c *Controller) keepAliveKubernetesServer(handle *upstreamHandle, now time.
srv.keepAliveErrs++
handle.kubernetesServers[name] = srv
shouldRemove := srv.keepAliveErrs > c.maxKeepAliveErrs
log.Warnf("Failed to keep alive kubernetes server %q: %v (count=%d, removing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldRemove)
slog.WarnContext(c.closeContext, "Failed to keep alive kubernetes server",
"server_id", handle.Hello().ServerID,
"error", err,
"error_count", srv.keepAliveErrs,
"should_remove", shouldRemove,
)

if shouldRemove {
c.testEvent(kubeKeepAliveDel)
Expand All @@ -1010,7 +1064,10 @@ func (c *Controller) keepAliveKubernetesServer(handle *upstreamHandle, now time.
lease, err := c.auth.UpsertKubernetesServer(c.closeContext, srv.resource)
if err != nil {
c.testEvent(kubeUpsertRetryErr)
log.Warnf("Failed to upsert kubernetes server %q on retry: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert kubernetes server on retry.",
"server_id", handle.Hello().ServerID,
"error", err,
)
// since this is retry-specific logic, an error here means that upsert failed twice in
// a row. Missing upserts is more problematic than missing keepalives so we don'resource bother
// attempting a third time.
Expand Down Expand Up @@ -1039,7 +1096,12 @@ func (c *Controller) keepAliveSSHServer(handle *upstreamHandle, now time.Time) e
handle.sshServer.keepAliveErrs++
shouldClose := handle.sshServer.keepAliveErrs > c.maxKeepAliveErrs

log.Warnf("Failed to keep alive ssh server %q: %v (count=%d, closing=%v).", handle.Hello().ServerID, err, handle.sshServer.keepAliveErrs, shouldClose)
slog.WarnContext(c.closeContext, "Failed to keep alive ssh server",
"server_id", handle.Hello().ServerID,
"error", err,
"error_count", handle.sshServer.keepAliveErrs,
"should_remove", shouldClose,
)

if shouldClose {
return trace.Errorf("failed to keep alive ssh server: %v", err)
Expand All @@ -1053,7 +1115,10 @@ func (c *Controller) keepAliveSSHServer(handle *upstreamHandle, now time.Time) e
lease, err := c.auth.UpsertNode(c.closeContext, handle.sshServer.resource)
if err != nil {
c.testEvent(sshUpsertRetryErr)
log.Warnf("Failed to upsert ssh server %q on retry: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert ssh server on retry",
"server_id", handle.Hello().ServerID,
"error", err,
)
// since this is retry-specific logic, an error here means that upsert failed twice in
// a row. Missing upserts is more problematic than missing keepalives so we don'resource bother
// attempting a third time.
Expand Down
14 changes: 7 additions & 7 deletions lib/inventory/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"context"
"errors"
"io"
"log/slog"
"sync"
"sync/atomic"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"

"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
Expand Down Expand Up @@ -163,7 +163,7 @@ func (h *downstreamHandle) autoEmitMetadata() {
md, err := h.metadataGetter(h.CloseContext())
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Warnf("Failed to get agent metadata: %v", err)
slog.WarnContext(h.CloseContext(), "Failed to get agent metadata", "error", err)
}
return
}
Expand All @@ -188,7 +188,7 @@ func (h *downstreamHandle) autoEmitMetadata() {

// Send metadata.
if err := sender.Send(h.CloseContext(), msg); err != nil && !errors.Is(err, context.Canceled) {
log.Warnf("Failed to send agent metadata: %v", err)
slog.WarnContext(h.CloseContext(), "Failed to send agent metadata", "error", err)
}

// Block for the duration of the stream.
Expand All @@ -209,7 +209,7 @@ func (h *downstreamHandle) run(fn DownstreamCreateFunc, hello proto.UpstreamInve
return
}

log.Debugf("Re-attempt control stream acquisition in ~%s.", retry.Duration())
slog.DebugContext(h.closeContext, "Re-attempt control stream acquisition", "backoff", retry.Duration())
select {
case <-retry.After():
retry.Inc()
Expand All @@ -223,14 +223,14 @@ func (h *downstreamHandle) tryRun(fn DownstreamCreateFunc, hello proto.UpstreamI
stream, err := fn(h.CloseContext())
if err != nil {
if !h.closing() {
log.Warnf("Failed to create inventory control stream: %v.", err)
slog.WarnContext(h.CloseContext(), "Failed to create inventory control stream", "error", err)
}
return
}

if err := h.handleStream(stream, hello); err != nil {
if !h.closing() {
log.Warnf("Inventory control stream failed: %v", err)
slog.WarnContext(h.CloseContext(), "Inventory control stream failed", "error", err)
}
return
}
Expand Down Expand Up @@ -298,7 +298,7 @@ func (h *downstreamHandle) handlePing(sender DownstreamSender, msg proto.Downstr
h.mu.Lock()
defer h.mu.Unlock()
if len(h.pingHandlers) == 0 {
log.Warnf("Got ping with no handlers registered (id=%d).", msg.ID)
slog.WarnContext(h.closeContext, "Got ping with no handlers registered", "ping_id", msg.ID)
return
}
for _, handler := range h.pingHandlers {
Expand Down
9 changes: 4 additions & 5 deletions lib/inventory/metadata/metadata_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@
package metadata

import (
"runtime"

log "github.com/sirupsen/logrus"
"context"
"log/slog"
)

// fetchOSVersion returns "" if not on linux and not on darwin.
func (c *fetchConfig) fetchOSVersion() string {
log.Warningf("fetchOSVersion is not implemented for %s", runtime.GOOS)
slog.WarnContext(context.Background(), "fetchOSVersion is not implemented")
return ""
}

// fetchGlibcVersion returns "" if not on linux and not on darwin.
func (c *fetchConfig) fetchGlibcVersion() string {
log.Warningf("fetchGlibcVersion is not implemented for %s", runtime.GOOS)
slog.WarnContext(context.Background(), "fetchGlibcVersion is not implemented")
return ""
}