From 75f4d105cfee6bc9529f3198ed4b559c4afd0d22 Mon Sep 17 00:00:00 2001 From: Cam Hutchison Date: Thu, 11 Sep 2025 11:29:11 +1000 Subject: [PATCH 1/5] lib/config: Add static config for AccessGraph EKS audit logs Extend the static config for Access Graph discovery to be able to specify the EKS cluster for which apiserver audit logs should be fetched and sent to Access Graph. --- lib/config/configuration.go | 7 +++++++ lib/config/fileconf.go | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/lib/config/configuration.go b/lib/config/configuration.go index ab5d7c4d48285..742f7144230aa 100644 --- a/lib/config/configuration.go +++ b/lib/config/configuration.go @@ -1709,11 +1709,18 @@ kubernetes matchers are present`) Region: awsMatcher.CloudTrailLogs.QueueRegion, } } + var eksAuditLogs *types.AccessGraphAWSSyncEKSAuditLogs + if awsMatcher.EKSAuditLogs != nil { + eksAuditLogs = &types.AccessGraphAWSSyncEKSAuditLogs{ + Tags: awsMatcher.EKSAuditLogs.Tags, + } + } tMatcher.AWS = append(tMatcher.AWS, &types.AccessGraphAWSSync{ Regions: regions, AssumeRole: assumeRole, CloudTrailLogs: cloudTrailLogs, + EksAuditLogs: eksAuditLogs, }) } for _, azureMatcher := range fc.Discovery.AccessGraph.Azure { diff --git a/lib/config/fileconf.go b/lib/config/fileconf.go index 0b928306817b1..4ba0330bc4ed3 100644 --- a/lib/config/fileconf.go +++ b/lib/config/fileconf.go @@ -1640,6 +1640,17 @@ type AccessGraphAWSSync struct { // CloudTrailLogs is the configuration for the SQS queue to poll for // CloudTrail logs. CloudTrailLogs *AccessGraphAWSSyncCloudTrailLogs `yaml:"cloud_trail_logs,omitempty"` + // EKSAuditLogs is the configuration for fetching audit logs for EKS + // clusters discovered. + EKSAuditLogs *AccessGraphEKSAuditLogs `yaml:"eks_audit_logs,omitempty"` +} + +// AccessGraphEKSAuditLogs is the configuration for fetching audit logs from +// clusters discovered for access graph. +type AccessGraphEKSAuditLogs struct { + // Tags are AWS EKS tags to match. Clusters that have tags that match these + // will have their audit logs fetched and sent to Access Graph. + Tags map[string]apiutils.Strings `yaml:"tags,omitempty"` } // AccessGraphAzureSync represents the configuration for the Azure AccessGraph Sync service. From 6f38bb64ccd28741bf4d816b540e8c1ab9c8e951 Mon Sep 17 00:00:00 2001 From: Cam Hutchison Date: Thu, 11 Sep 2025 13:22:47 +1000 Subject: [PATCH 2/5] discovery: Add AWS EKS audit log fetching for Access Graph Add a watcher to start fetchers for all access graph EKS clusters that are configured to have Kubernetes apiserver audit logs fetched and send them to access graph. It receives the set of clusters to fetch audit logs for from the AWS resource syncer as it discovers EKS clusters. Those clusters are reconciled against the current set of log fetchers, with no-longer-needed fetchers stopped and new fetchers started as needed. This commit requires go.mod be updated with: go get github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs@latest It is left out of this commit for now as it makes rebasing/merging master easier. --- lib/srv/discovery/access_graph_aws.go | 64 ++++- lib/srv/discovery/eks_audit_log_fetcher.go | 191 +++++++++++++ lib/srv/discovery/eks_audit_log_watcher.go | 250 ++++++++++++++++++ .../discovery/fetchers/aws-sync/aws-sync.go | 18 ++ .../fetchers/aws-sync/cloudwatchlogs.go | 102 +++++++ .../discovery/fetchers/aws-sync/rds_test.go | 6 + 6 files changed, 620 insertions(+), 11 deletions(-) create mode 100644 lib/srv/discovery/eks_audit_log_fetcher.go create mode 100644 lib/srv/discovery/eks_audit_log_watcher.go create mode 100644 lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go diff --git a/lib/srv/discovery/access_graph_aws.go b/lib/srv/discovery/access_graph_aws.go index 052df6aaf96a9..f1322c1370a47 100644 --- a/lib/srv/discovery/access_graph_aws.go +++ b/lib/srv/discovery/access_graph_aws.go @@ -73,10 +73,17 @@ const ( // errNoAccessGraphFetchers is returned when there are no TAG fetchers. var errNoAccessGraphFetchers = errors.New("no Access Graph fetchers") -func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *aws_sync.Resources, stream accessgraphv1alpha.AccessGraphService_AWSEventsStreamClient, features aws_sync.Features) error { +func (s *Server) reconcileAccessGraph( + ctx context.Context, + currentTAGResources *aws_sync.Resources, + stream accessgraphv1alpha.AccessGraphService_AWSEventsStreamClient, + features aws_sync.Features, + eksAuditLogWatcher *eksAuditLogWatcher, +) error { type fetcherResult struct { - result *aws_sync.Resources - err error + fetcher *aws_sync.Fetcher + result *aws_sync.Resources + err error } allFetchers := s.getAllAWSSyncFetchers() @@ -88,6 +95,8 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * if err := push(stream, upsert, toDel); err != nil { s.Log.ErrorContext(ctx, "Error pushing empty resources to TAGs", "error", err) } + // No clusters to fetch eks audit logs for. + eksAuditLogWatcher.Reconcile(ctx, nil) return trace.Wrap(errNoAccessGraphFetchers) } @@ -110,21 +119,39 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * <-tokens }() result, err := fetcher.Poll(ctx, features) - resultsC <- fetcherResult{result, trace.Wrap(err)} + resultsC <- fetcherResult{fetcher, result, trace.Wrap(err)} }() } results := make([]*aws_sync.Resources, 0, len(allFetchers)) + auditLogClusters := make([]eksAuditLogCluster, 0, len(allFetchers)) errs := make([]error, 0, len(allFetchers)) // Collect the results from all fetchers. // Each fetcher can return an error and a result. for range allFetchers { fetcherResult := <-resultsC - if fetcherResult.err != nil { - errs = append(errs, fetcherResult.err) + fetcher, result, err := fetcherResult.fetcher, fetcherResult.result, fetcherResult.err + if err != nil { + errs = append(errs, err) } - if fetcherResult.result != nil { - results = append(results, fetcherResult.result) + if result == nil { + continue + } + results = append(results, result) + // If the fetcher is configured for EKS audit logs, see if any + // EKS clusters match the configured tags. + if fetcher.EKSAuditLogs == nil { + continue + } + for _, cluster := range result.EKSClusters { + clusterTags := make(map[string]string, len(cluster.Tags)) + for _, tag := range cluster.Tags { + clusterTags[tag.GetKey()] = tag.GetValue().GetValue() + } + match, _, _ := services.MatchLabels(fetcher.EKSAuditLogs.Tags, clusterTags) + if match { + auditLogClusters = append(auditLogClusters, eksAuditLogCluster{fetcher, cluster}) + } } } // Aggregate all errors into a single error. @@ -137,6 +164,10 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * upsert, toDel := aws_sync.ReconcileResults(currentTAGResources, result) pushErr := push(stream, upsert, toDel) + // Send the updated list of clusters requiring audit logs to the fetcher. + // The fetcher reconciles this list against the last set sent. + eksAuditLogWatcher.Reconcile(ctx, auditLogClusters) + for _, fetcher := range allFetchers { s.tagSyncStatus.syncFinished(fetcher, pushErr, s.clock.Now()) } @@ -425,11 +456,16 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c } s.Log.InfoContext(ctx, "Access graph service poll interval", "poll_interval", tickerInterval) + // Start the EKS audit log watcher that keeps track of the EKS audit log + // fetchers and updates them when Reconcile is called. + eksAuditLogWatcher := newEKSAuditLogWatcher(client, s.Log) + go eksAuditLogWatcher.Run(ctx) + currentTAGResources := &aws_sync.Resources{} timer := time.NewTimer(tickerInterval) defer timer.Stop() for { - err := s.reconcileAccessGraph(ctx, currentTAGResources, stream, features) + err := s.reconcileAccessGraph(ctx, currentTAGResources, stream, features, eksAuditLogWatcher) if errors.Is(err, errNoAccessGraphFetchers) { // no fetchers, no need to continue. // we will wait for the config to change and re-evaluate the fetchers @@ -508,7 +544,8 @@ func (s *Server) initTAGAWSWatchers(ctx context.Context, cfg *Config) error { continue } // reset the currentTAGResources to force a full sync - if err := s.initializeAndWatchAccessGraph(ctx, reloadCh); errors.Is(err, errTAGFeatureNotEnabled) { + err := s.initializeAndWatchAccessGraph(ctx, reloadCh) + if errors.Is(err, errTAGFeatureNotEnabled) { s.Log.WarnContext(ctx, "Access Graph specified in config, but the license does not include Teleport Identity Security. Access graph sync will not be enabled.") break } else if err != nil { @@ -576,6 +613,10 @@ func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matcher ExternalID: awsFetcher.AssumeRole.ExternalID, } } + var eksAuditLogs *aws_sync.EKSAuditLogs + if awsFetcher.EksAuditLogs != nil { + eksAuditLogs = &aws_sync.EKSAuditLogs{Tags: awsFetcher.EksAuditLogs.Tags} + } fetcher, err := aws_sync.NewFetcher( ctx, aws_sync.Config{ @@ -587,6 +628,7 @@ func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matcher Integration: awsFetcher.Integration, DiscoveryConfigName: discoveryConfigName, Log: s.Log, + EKSAuditLogs: eksAuditLogs, }, ) if err != nil { @@ -848,7 +890,7 @@ func (s *Server) receiveTAGResumeFromStream(ctx context.Context, stream accessgr return nil } -func consumeTillErr(stream accessgraphv1alpha.AccessGraphService_AWSCloudTrailStreamClient) error { +func consumeTillErr[Req any, Res any](stream grpc.BidiStreamingClient[Req, Res]) error { for { _, err := stream.Recv() if err != nil { diff --git a/lib/srv/discovery/eks_audit_log_fetcher.go b/lib/srv/discovery/eks_audit_log_fetcher.go new file mode 100644 index 0000000000000..017d140a74762 --- /dev/null +++ b/lib/srv/discovery/eks_audit_log_fetcher.go @@ -0,0 +1,191 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package discovery + +import ( + "context" + "errors" + "log/slog" + "time" + + cwltypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" + "github.com/gravitational/trace" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" + + accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha" + aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync" +) + +// eksAuditLogFetcher is a fetcher for EKS audit logs for a single cluster, +// fetching the logs from AWS Cloud Watch Logs. It uses the grpc stream +// to initiate the stream and possibly receive a resume state used to +// synchronize the start point with a previous run fetching the logs. +type eksAuditLogFetcher struct { + fetcher *aws_sync.Fetcher + cluster *accessgraphv1alpha.AWSEKSClusterV1 + stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient + log *slog.Logger + cancel context.CancelFunc +} + +// Run continuously polls AWS Cloud Watch Logs for Kubernetes apiserver +// audit logs for the configured cluster. It feeds the logs retrieved to the +// configured grpc stream, running until the given context is canceled. +func (f *eksAuditLogFetcher) Run(ctx context.Context) error { + f.log = f.log.With("cluster", f.cluster.Arn) + + cursor := initialCursor(f.cluster) + if err := f.sendTAGKubeAuditLogNewStream(ctx, cursor); err != nil { + return trace.Wrap(err) + } + + cursor, err := f.receiveTAGKubeAuditLogResume(ctx) + if err != nil { + return trace.Wrap(err) + } + + for ctx.Err() == nil { + var events []*structpb.Struct + events, cursor = f.fetchLogs(ctx, cursor) + + if len(events) == 0 { + select { + case <-ctx.Done(): + case <-time.After(logPollInterval): + } + continue + } + + if err := f.sendTAGKubeAuditLogEvents(ctx, events, cursor); err != nil { + return trace.Wrap(err) + } + + f.log.DebugContext(ctx, "Sent KubeAuditLogEvents", "count", len(events), + "cursor_time", cursor.LastEventTime.AsTime()) + } + return trace.Wrap(ctx.Err()) +} + +// fetchLogs fetches a batch of logs from AWS Cloud Watch Logs after the given +// cursor position and unmarshals them into the protobuf Struct well-known +// type. +// +// It returns the fetched log entries and a new cursor for the next call. If an +// error occurs, it is logged, and the function returns nil logs and the +// original input cursor. This allows the caller to retry the operation. +func (f *eksAuditLogFetcher) fetchLogs(ctx context.Context, cursor *accessgraphv1alpha.KubeAuditLogCursor) ([]*structpb.Struct, *accessgraphv1alpha.KubeAuditLogCursor) { + awsEvents, err := f.fetcher.FetchEKSAuditLogs(ctx, f.cluster, cursor) + if err != nil { + if !errors.Is(err, context.Canceled) { + f.log.ErrorContext(ctx, "Failed to fetch EKS audit logs", "error", err) + } + return nil, cursor + } + + if len(awsEvents) == 0 { + return nil, cursor + } + + events := []*structpb.Struct{} + var awsEvent cwltypes.FilteredLogEvent + for _, awsEvent = range awsEvents { + // TODO(camscale): Track event sizes and don't go over protobuf message + // limit. newAccessGraphClient() sets the limit to 50MB + event := &structpb.Struct{} + m := protojson.UnmarshalOptions{} + err = m.Unmarshal([]byte(*awsEvent.Message), event) + if err != nil { + f.log.ErrorContext(ctx, "failed to protojson.Unmarshal", "error", err) + continue + } + events = append(events, event) + } + cursor = cursorFromEvent(f.cluster, awsEvent) + + return events, cursor +} + +func (f *eksAuditLogFetcher) sendTAGKubeAuditLogNewStream(ctx context.Context, cursor *accessgraphv1alpha.KubeAuditLogCursor) error { + err := f.stream.Send( + &accessgraphv1alpha.KubeAuditLogStreamRequest{ + Action: &accessgraphv1alpha.KubeAuditLogStreamRequest_NewStream{ + NewStream: &accessgraphv1alpha.KubeAuditLogNewStream{Initial: cursor}, + }, + }, + ) + if err != nil { + err = consumeTillErr(f.stream) + f.log.ErrorContext(ctx, "Failed to send accessgraph.KubeAuditLogNewStream", "error", err) + return trace.Wrap(err) + } + return nil +} + +func (f *eksAuditLogFetcher) receiveTAGKubeAuditLogResume(ctx context.Context) (*accessgraphv1alpha.KubeAuditLogCursor, error) { + msg, err := f.stream.Recv() + if err != nil { + return nil, trace.Wrap(err, "failed to receive KubeAuditLogStream resume state") + } + + state := msg.GetResumeState() + if state == nil { + return nil, trace.BadParameter("AccessGraphService.KubeAuditLogStream did not return KubeAuditLogResumeState message") + } + + f.log.InfoContext(ctx, "KubeAuditLogResumeState received", "state", state) + return state.Cursor, nil +} + +func (f *eksAuditLogFetcher) sendTAGKubeAuditLogEvents(ctx context.Context, events []*structpb.Struct, cursor *accessgraphv1alpha.KubeAuditLogCursor) error { + err := f.stream.Send( + &accessgraphv1alpha.KubeAuditLogStreamRequest{ + Action: &accessgraphv1alpha.KubeAuditLogStreamRequest_Events{ + Events: &accessgraphv1alpha.KubeAuditLogEvents{Events: events, Cursor: cursor}, + }, + }, + ) + if err != nil { + err = consumeTillErr(f.stream) + f.log.ErrorContext(ctx, "Failed to send accessgraph.KubeAuditLogEvents", "error", err) + return trace.Wrap(err) + } + return nil +} + +func cursorFromEvent(cluster *accessgraphv1alpha.AWSEKSClusterV1, event cwltypes.FilteredLogEvent) *accessgraphv1alpha.KubeAuditLogCursor { + return &accessgraphv1alpha.KubeAuditLogCursor{ + LogSource: accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS, + ClusterId: cluster.Arn, + EventId: *event.EventId, + LastEventTime: timestamppb.New(time.UnixMilli(*event.Timestamp)), + } +} + +// initialCursor returns a cursor for a EKS cluster that we have not previously +// retrieved logs from, so there is no resume state. The cursor is set to +// have logs retrieved back a standard amount of time. +func initialCursor(cluster *accessgraphv1alpha.AWSEKSClusterV1) *accessgraphv1alpha.KubeAuditLogCursor { + return &accessgraphv1alpha.KubeAuditLogCursor{ + LogSource: accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS, + ClusterId: cluster.Arn, + LastEventTime: timestamppb.New(time.Now().UTC().Add(-initialLogBacklog)), + } +} diff --git a/lib/srv/discovery/eks_audit_log_watcher.go b/lib/srv/discovery/eks_audit_log_watcher.go new file mode 100644 index 0000000000000..3b4059b48ec39 --- /dev/null +++ b/lib/srv/discovery/eks_audit_log_watcher.go @@ -0,0 +1,250 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package discovery + +import ( + "context" + "errors" + "iter" + "log/slog" + "time" + + "github.com/gravitational/trace" + + accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha" + aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync" +) + +const ( + // initialLogBacklog is how far back to start retrieving EKS audit logs + // for newly discovered clusters. + // TODO(camscale): define in config. + initialLogBacklog = 7 * 24 * time.Hour + + // logPollInterval is the amount of time to sleep between fetching audit + // logs from Cloud Watch Logs for a cluster. + // TODO(camscale): perhaps define in config. + logPollInterval = 30 * time.Second +) + +// eksAuditLogCluster is a cluster for which audit logs should be fetched and +// the fetcher to use to do that. It is sent over a channel as a slice from +// the AWS resource watcher (access_graph_aws.go) to the EKS audit log watcher +// created here. For each one of these received, an asynchronous log fetcher +// is spawned to fetch Kubernetes apiserver audit logs from Cloud Watch Logs +// and sent to the grpc AccessGraphService via the KubeAuditLogStream rpc. +type eksAuditLogCluster struct { + fetcher *aws_sync.Fetcher + cluster *accessgraphv1alpha.AWSEKSClusterV1 +} + +// eksAuditLogWatcher is a watcher that waits for notifications on a channel +// indicating what EKS clusters should have audit logs fetched, and reconciles +// that against what is currently being fetched. Fetchers are started and +// stopped in response to this reconcilliation. +type eksAuditLogWatcher struct { + client accessgraphv1alpha.AccessGraphServiceClient + log *slog.Logger + auditLogClustersCh chan []eksAuditLogCluster + + fetchers map[string]*eksAuditLogFetcher + completedCh chan fetcherCompleted +} + +func newEKSAuditLogWatcher( + client accessgraphv1alpha.AccessGraphServiceClient, + logger *slog.Logger, +) *eksAuditLogWatcher { + return &eksAuditLogWatcher{ + client: client, + log: logger, + auditLogClustersCh: make(chan []eksAuditLogCluster), + fetchers: make(map[string]*eksAuditLogFetcher), + completedCh: make(chan fetcherCompleted), + } +} + +// fetcherCompleted captures the result of a completed eksAuditLogFetcher. +type fetcherCompleted struct { + fetcher *eksAuditLogFetcher + err error +} + +// Run starts a watcher by creating a KubeAuditLogStream on its grpc client. It +// negotiates a configuration (currently a no-op) and starts a main loop +// waiting for a list of clusters that it should run log fetchers for. As these +// lists of clusters arrives, it reconciles it against the running log fetchers +// and starts/stops log fetchers as required to match the given list. It +// completes when the given context is done. +// +// If any errors occur initializing the grpc stream, it is returned and the +// main loop is not run. +func (w *eksAuditLogWatcher) Run(ctx context.Context) error { + w.log.InfoContext(ctx, "EKS Audit Log Watcher started") + defer w.log.InfoContext(ctx, "EKS Audit Log Watcher completed") + + stream, err := w.client.KubeAuditLogStream(ctx) + if err != nil { + w.log.ErrorContext(ctx, "Failed to get access graph service KubeAuditLogStream", "error", err) + return trace.Wrap(err) + } + + config := &accessgraphv1alpha.KubeAuditLogConfig{} + if err := sendTAGKubeAuditLogConfig(ctx, stream, config); err != nil { + w.log.ErrorContext(ctx, "Failed to send access graph config", "error", err) + return trace.Wrap(err) + } + + config, err = receiveTAGKubeAuditLogConfig(ctx, stream) + if err != nil { + w.log.ErrorContext(ctx, "Failed to receive access graph config", "error", err) + return trace.Wrap(err) + } + w.log.InfoContext(ctx, "KubeAuditLogConfig received", "config", config) + + // Loop waiting for EKS clusters we need to fetch audit logs for on + // s.awsKubeAuditLogClustersCh channel (from the resource syncer). + // Reconcile that list of clusters against what we know and start/stop + // any log fetchers necessary. + for { + select { + case clusters := <-w.auditLogClustersCh: + w.reconcile(ctx, clusters, stream) + case completed := <-w.completedCh: + w.complete(ctx, completed) + case <-ctx.Done(): + return trace.Wrap(ctx.Err()) + } + } +} + +// Reconcile triggers a reconcilliation of currently running fetchers against +// the given slice of clusters. The reconcilliation will stop any fetchers for +// clusters not in the slice and start any fetchers for clusters in the slice +// that are not running. +// +// If the given context is done before the clusters can be sent to the +// reconcilliation goroutine, the context's error will be returned. +func (w *eksAuditLogWatcher) Reconcile(ctx context.Context, clusters []eksAuditLogCluster) error { + select { + case <-ctx.Done(): + return trace.Wrap(ctx.Err()) + case w.auditLogClustersCh <- clusters: + } + + return nil +} + +// reconcile compares the given slice of clusters against the currently running +// log fetchers and stops any running fetchers not in the cluster slice and +// starts a log fetcher for any cluster in the slice that does not have a +// running log fetcher. +// +// Log fetchers that are started are initialized with the given grpc stream +// over which they should send their audit logs. +func (w *eksAuditLogWatcher) reconcile(ctx context.Context, clusters []eksAuditLogCluster, stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient) { + w.log.DebugContext(ctx, "Reconciling EKS audit log clusters", "new_count", len(clusters)) + + // Make a map of the discovered clusters, keyed by ARN so we can compare against + // the existing clusters we are fetching audit logs for. + discoveredClusters := make(map[string]eksAuditLogCluster) + for _, discovered := range clusters { + discoveredClusters[discovered.cluster.Arn] = discovered + } + // Stop any fetchers for clusters we are running fetcher for that discovery did not return. + for arn, existing := range mapDifference(w.fetchers, discoveredClusters) { + w.log.InfoContext(ctx, "Stopping eksKubeAuditLogFetcher", "cluster", arn) + existing.cancel() + // cleanup will happen when the fetcher finishes and is put on the completed channel. + } + // Start any new fetchers for clusters we are not running that discovery returned. + for arn, discovered := range mapDifference(discoveredClusters, w.fetchers) { + w.log.InfoContext(ctx, "Starting eksKubeAuditLogFetcher", "cluster", arn) + ctx, cancel := context.WithCancel(ctx) + logFetcher := &eksAuditLogFetcher{ + fetcher: discovered.fetcher, + cluster: discovered.cluster, + stream: stream, + log: w.log, + cancel: cancel, + } + w.fetchers[arn] = logFetcher + go func() { + err := logFetcher.Run(ctx) + select { + case w.completedCh <- fetcherCompleted{logFetcher, err}: + case <-ctx.Done(): + } + }() + } +} + +// complete cleans up the maintained list of running log fetchers, removing the +// given completed fetcher, and logs the completion status of the fetcher. +func (w *eksAuditLogWatcher) complete(ctx context.Context, completed fetcherCompleted) { + arn := completed.fetcher.cluster.Arn + if completed.err != nil && !errors.Is(completed.err, context.Canceled) { + w.log.ErrorContext(ctx, "eksKubeAuditLogFetcher completed with error", "cluster", arn, "error", completed.err) + } else { + w.log.InfoContext(ctx, "eksKubeAuditLogFetcher completed", "cluster", arn) + } + delete(w.fetchers, arn) +} + +// mapDifference yields all keys and values of m1 where the key is not in m2. It +// can be considered the set operation "mapDifference" - m1-m2, yielding all +// elements of m1 not in m2. +func mapDifference[K comparable, V1 any, V2 any](m1 map[K]V1, m2 map[K]V2) iter.Seq2[K, V1] { + return func(yield func(K, V1) bool) { + for k, v := range m1 { + if _, ok := m2[k]; !ok { + if !yield(k, v) { + return + } + } + } + } +} + +func sendTAGKubeAuditLogConfig(ctx context.Context, stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient, config *accessgraphv1alpha.KubeAuditLogConfig) error { + err := stream.Send( + &accessgraphv1alpha.KubeAuditLogStreamRequest{ + Action: &accessgraphv1alpha.KubeAuditLogStreamRequest_Config{Config: config}, + }, + ) + if err != nil { + err = consumeTillErr(stream) + return trace.Wrap(err) + } + return nil +} + +func receiveTAGKubeAuditLogConfig(ctx context.Context, stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient) (*accessgraphv1alpha.KubeAuditLogConfig, error) { + msg, err := stream.Recv() + if err != nil { + return nil, trace.Wrap(err, "failed to receive KubeAuditLogStream config") + } + + config := msg.GetConfig() + if config == nil { + return nil, trace.BadParameter("AccessGraphService.KubeAuditLogStream did not return KubeAuditLogConfig message") + } + + return config, nil +} diff --git a/lib/srv/discovery/fetchers/aws-sync/aws-sync.go b/lib/srv/discovery/fetchers/aws-sync/aws-sync.go index c66275dfd7945..369466c2de2bd 100644 --- a/lib/srv/discovery/fetchers/aws-sync/aws-sync.go +++ b/lib/srv/discovery/fetchers/aws-sync/aws-sync.go @@ -27,6 +27,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/retry" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/iam" "github.com/aws/aws-sdk-go-v2/service/kms" "github.com/aws/aws-sdk-go-v2/service/rds" @@ -36,6 +37,7 @@ import ( "golang.org/x/sync/errgroup" usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1" + "github.com/gravitational/teleport/api/types" accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha" "github.com/gravitational/teleport/lib/cloud/awsconfig" "github.com/gravitational/teleport/lib/srv/server" @@ -67,6 +69,9 @@ type Config struct { DiscoveryConfigName string // Log is the logger to use for logging. Log *slog.Logger + // EKSAuditLogs if set specifies the EKS clusters for which apiserver audit logs + // should be fetched. + EKSAuditLogs *EKSAuditLogs // awsClients provides AWS SDK clients. awsClients awsClientProvider @@ -128,6 +133,8 @@ type awsClientProvider interface { getSTSClient(cfg aws.Config, optFns ...func(*sts.Options)) stsClient // getKMSClient provides a [kmsClient]. getKMSClient(cfg aws.Config, optFns ...func(*kms.Options)) kmsClient + // getCloudWatchLogsClient provides a [cloudwatchlogs.FilterLogEventsAPIClient]. + getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cloudwatchlogs.FilterLogEventsAPIClient } type defaultAWSClients struct{} @@ -152,6 +159,10 @@ func (defaultAWSClients) getKMSClient(cfg aws.Config, optFns ...func(*kms.Option return kms.NewFromConfig(cfg, optFns...) } +func (defaultAWSClients) getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cloudwatchlogs.FilterLogEventsAPIClient { + return cloudwatchlogs.NewFromConfig(cfg, optFns...) +} + // AssumeRole is the configuration for assuming an AWS role. type AssumeRole struct { // RoleARN is the ARN of the role to assume. @@ -160,6 +171,13 @@ type AssumeRole struct { ExternalID string } +// EKSAuditLogs is the configuration of which discovered EKS clusters should have +// their apiserver audit logs fetched and sent to Access Graph. +type EKSAuditLogs struct { + // Tags is a set of name/value tags that an EKS cluster must have for audit log fetching. + Tags types.Labels +} + // Fetcher is a fetcher that fetches AWS resources. type Fetcher struct { Config diff --git a/lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go b/lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go new file mode 100644 index 0000000000000..6800b75b2de2e --- /dev/null +++ b/lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go @@ -0,0 +1,102 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package aws_sync + +import ( + "context" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" + cwltypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" + "github.com/gravitational/trace" + + accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha" +) + +// FetchEKSAuditLogs returns a slice of audit log events for the given cluster +// starting from the given cursor. +func (a *Fetcher) FetchEKSAuditLogs(ctx context.Context, cluster *accessgraphv1alpha.AWSEKSClusterV1, cursor *accessgraphv1alpha.KubeAuditLogCursor) ([]cwltypes.FilteredLogEvent, error) { + cfg, err := a.AWSConfigProvider.GetConfig(ctx, cluster.Region, a.getAWSOptions()...) + if err != nil { + return nil, trace.Wrap(err) + } + client := a.awsClients.getCloudWatchLogsClient(cfg) + + // limit is not a hard limit - we may exceed it but won't get any more pages + // once reached. + var limit int32 = 500 // TODO(camscale): Consider making this a parameter + startTime := cursor.LastEventTime.AsTime().UTC() + input := &cloudwatchlogs.FilterLogEventsInput{ + LogGroupName: aws.String("/aws/eks/" + cluster.Name + "/cluster"), + LogStreamNamePrefix: aws.String("kube-apiserver-audit-"), + StartTime: aws.Int64(startTime.UnixMilli()), + Limit: aws.Int32(limit), + } + + var result []cwltypes.FilteredLogEvent + for p := cloudwatchlogs.NewFilterLogEventsPaginator(client, input); p.HasMorePages(); { + output, err := p.NextPage(ctx) + if err != nil { + return nil, trace.Wrap(err) + } + + eventsAfterCursor := cwlEventsAfterCursor(output.Events, cursor) + if eventsAfterCursor != nil { + cursor = nil + result = append(result, eventsAfterCursor...) + if len(result) >= int(limit) { + break + } + } + } + + return result, nil +} + +// cwlEventsAfterCursor returns the events from the given events after the +// cursor. If the cursor was not found, but the timestamp of the cursor +// was not passed, then nil is returned. In this case, the cursor is still +// valid and should continue to be used to find the next event. Otherwise +// a slice (possibly empty) is returned which means the cursor was consumed; +// either the event ID in the cursor was found, or we passed the timestamp +// of the cursor. +// If the cursor is nil, the events are returned unfiltered. +func cwlEventsAfterCursor(events []cwltypes.FilteredLogEvent, cursor *accessgraphv1alpha.KubeAuditLogCursor) []cwltypes.FilteredLogEvent { + // If we're not looking for events from a cursor position, just return all events. + if cursor == nil || cursor.EventId == "" { + return events + } + + startTime := cursor.LastEventTime.AsTime().UTC() + for i, event := range events { + // If we never saw cursor.EventId with the given timestamp, + // just return all the events. + if time.UnixMilli(*event.Timestamp).UTC().After(startTime) { + return events + } + if *event.EventId == cursor.EventId { + return events[i+1:] + } + } + // The cursor was not found in the events, but it was not discarded as + // the timestamp on the events did not move past the cursor timestamp. + // A nil slice (as opposed to an empty slice) indicates this. + return nil +} diff --git a/lib/srv/discovery/fetchers/aws-sync/rds_test.go b/lib/srv/discovery/fetchers/aws-sync/rds_test.go index ce1ae0ed43e1a..6211212c6f002 100644 --- a/lib/srv/discovery/fetchers/aws-sync/rds_test.go +++ b/lib/srv/discovery/fetchers/aws-sync/rds_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/iam" "github.com/aws/aws-sdk-go-v2/service/kms" "github.com/aws/aws-sdk-go-v2/service/rds" @@ -224,6 +225,7 @@ type fakeAWSClients struct { s3Client s3Client stsClient stsClient kmsClient kmsClient + cwlClient cloudwatchlogs.FilterLogEventsAPIClient } func (f fakeAWSClients) getIAMClient(cfg aws.Config, optFns ...func(*iam.Options)) iamClient { @@ -245,3 +247,7 @@ func (f fakeAWSClients) getSTSClient(cfg aws.Config, optFns ...func(*sts.Options func (f fakeAWSClients) getKMSClient(cfg aws.Config, optFns ...func(*kms.Options)) kmsClient { return f.kmsClient } + +func (f fakeAWSClients) getCloudWatchLogsClient(cfg aws.Config, optFns ...func(*cloudwatchlogs.Options)) cloudwatchlogs.FilterLogEventsAPIClient { + return f.cwlClient +} From a9eadeca59993f64b3111c5fc0bbd8678ab013fa Mon Sep 17 00:00:00 2001 From: Cam Hutchison Date: Fri, 19 Sep 2025 13:01:10 +1000 Subject: [PATCH 3/5] Add github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs to go.mod Run: go get github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs@latest make go-mod-tidy-all # Manually move the go.mod line back to the first section!?!? This commit is kept separate for easier merging/rebasing. --- go.mod | 9 +++++---- go.sum | 18 ++++++++++-------- integrations/event-handler/go.mod | 8 ++++---- integrations/event-handler/go.sum | 16 ++++++++-------- integrations/terraform-mwi/go.mod | 9 +++++---- integrations/terraform-mwi/go.sum | 18 ++++++++++-------- integrations/terraform/go.mod | 9 +++++---- integrations/terraform/go.sum | 18 ++++++++++-------- 8 files changed, 57 insertions(+), 48 deletions(-) diff --git a/go.mod b/go.mod index a31b41385c73f..55e4fbbb083c9 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/andybalholm/brotli v1.2.0 github.com/aquasecurity/libbpfgo v0.5.1-libbpf-1.2 github.com/armon/go-radix v1.0.0 - github.com/aws/aws-sdk-go-v2 v1.39.5 + github.com/aws/aws-sdk-go-v2 v1.39.6 github.com/aws/aws-sdk-go-v2/config v1.31.16 github.com/aws/aws-sdk-go-v2/credentials v1.18.20 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.20 @@ -66,6 +66,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.41.1 github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1 + github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 github.com/aws/aws-sdk-go-v2/service/dax v1.29.4 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3 github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.32.1 @@ -313,9 +314,9 @@ require ( github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/atotto/clipboard v0.1.4 // indirect github.com/aws/aws-sdk-go v1.55.8 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 // indirect github.com/aws/aws-sdk-go-v2/service/ecr v1.45.1 // indirect diff --git a/go.sum b/go.sum index 279cb31136885..9601da3febbcc 100644 --- a/go.sum +++ b/go.sum @@ -794,10 +794,10 @@ github.com/aws/aws-sdk-go v1.49.12/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3Tj github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ= github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk= github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= -github.com/aws/aws-sdk-go-v2 v1.39.5 h1:e/SXuia3rkFtapghJROrydtQpfQaaUgd1cUvyO1mp2w= -github.com/aws/aws-sdk-go-v2 v1.39.5/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko= +github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk= +github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y= github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4= github.com/aws/aws-sdk-go-v2/config v1.31.16 h1:E4Tz+tJiPc7kGnXwIfCyUj6xHJNpENlY11oKpRTgsjc= github.com/aws/aws-sdk-go-v2/config v1.31.16/go.mod h1:2S9hBElpCyGMifv14WxQ7EfPumgoeCPZUpuPX8VtW34= @@ -816,11 +816,11 @@ github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.12/go.mod h1:T/o6k3LG7Ew45+Jz github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 h1:9/HxDeIgA7DcKK6e6ZaP5PQiXugYbNERx3Z5u30mN+k= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2/go.mod h1:3N1RoxKNcVHmbOKVMMw8pvMs5TUhGYPQP/aq1zmAWqo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 h1:p/9flfXdoAnwJnuW9xHEAFY22R3A6skYkW19JFF9F+8= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12/go.mod h1:ZTLHakoVCTtW8AaLGSwJ3LXqHD9uQKnOcv1TrpO6u2k= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 h1:2lTWFvRcnWFFLzHWmtddu5MTchc5Oj2OOey++99tPZ0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12/go.mod h1:hI92pK+ho8HVcWMHKHrK3Uml4pfG7wvL86FzO0LVtQQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= @@ -832,6 +832,8 @@ github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 h1:w50cPLPIyWSzh4bqgA/h0nzRw github.com/aws/aws-sdk-go-v2/service/athena v1.55.9/go.mod h1:jTVF/+wNGjLD94jaJxDqhWexDeH7r4zZkQ7bbboAf1I= github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1 h1:F/ZU3z+tNCIDhUD8wFEalX1GMdtU0SQlIXXi/hPFFpE= github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1/go.mod h1:PfutSAwCVczCH5sBPjuPc1pkjaSokL4DsJNlrLC3kww= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 h1:Yj4NvoEEdSxA90x/uCBskzeF3OxZr72Yaf64n0fIVR4= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7/go.mod h1:9/Q0/HtqBTLMksFse42wZjUq0jJrUuo4XlnXy/uSoeg= github.com/aws/aws-sdk-go-v2/service/dax v1.29.4 h1:IBtJf1olfwspLm3d20sDFstQTlHgL+dA2NtdXQMDius= github.com/aws/aws-sdk-go-v2/service/dax v1.29.4/go.mod h1:XciMFO7HCGFzOlvNl42HsWNakjQtt/JHzW26VzlahGI= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3 h1:28+obyib2FhFKASJ6qSPbuteiy0nvvcvfItdAAYure0= diff --git a/integrations/event-handler/go.mod b/integrations/event-handler/go.mod index 39e955d8ea923..a43384c55485d 100644 --- a/integrations/event-handler/go.mod +++ b/integrations/event-handler/go.mod @@ -71,14 +71,14 @@ require ( github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b // indirect github.com/armon/go-radix v1.0.0 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect - github.com/aws/aws-sdk-go-v2 v1.39.5 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect + github.com/aws/aws-sdk-go-v2 v1.39.6 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect github.com/aws/aws-sdk-go-v2/config v1.31.16 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.18.20 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 // indirect github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 // indirect diff --git a/integrations/event-handler/go.sum b/integrations/event-handler/go.sum index 8634c4c88bfa6..041b107eb3246 100644 --- a/integrations/event-handler/go.sum +++ b/integrations/event-handler/go.sum @@ -760,10 +760,10 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3d github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ= github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk= -github.com/aws/aws-sdk-go-v2 v1.39.5 h1:e/SXuia3rkFtapghJROrydtQpfQaaUgd1cUvyO1mp2w= -github.com/aws/aws-sdk-go-v2 v1.39.5/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko= +github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk= +github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y= github.com/aws/aws-sdk-go-v2/config v1.31.16 h1:E4Tz+tJiPc7kGnXwIfCyUj6xHJNpENlY11oKpRTgsjc= github.com/aws/aws-sdk-go-v2/config v1.31.16/go.mod h1:2S9hBElpCyGMifv14WxQ7EfPumgoeCPZUpuPX8VtW34= github.com/aws/aws-sdk-go-v2/credentials v1.18.20 h1:KFndAnHd9NUuzikHjQ8D5CfFVO+bgELkmcGY8yAw98Q= @@ -772,10 +772,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 h1:VO3FIM2TDbm0kqp6sFNR0P github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12/go.mod h1:6C39gB8kg82tx3r72muZSrNhHia9rjGkX7ORaS2GKNE= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 h1:9/HxDeIgA7DcKK6e6ZaP5PQiXugYbNERx3Z5u30mN+k= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2/go.mod h1:3N1RoxKNcVHmbOKVMMw8pvMs5TUhGYPQP/aq1zmAWqo= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 h1:p/9flfXdoAnwJnuW9xHEAFY22R3A6skYkW19JFF9F+8= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12/go.mod h1:ZTLHakoVCTtW8AaLGSwJ3LXqHD9uQKnOcv1TrpO6u2k= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 h1:2lTWFvRcnWFFLzHWmtddu5MTchc5Oj2OOey++99tPZ0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12/go.mod h1:hI92pK+ho8HVcWMHKHrK3Uml4pfG7wvL86FzO0LVtQQ= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 h1:itu4KHu8JK/N6NcLIISlf3LL1LccMqruLUXZ9y7yBZw= diff --git a/integrations/terraform-mwi/go.mod b/integrations/terraform-mwi/go.mod index 4ebcad8fde40d..da24a1394eb2b 100644 --- a/integrations/terraform-mwi/go.mod +++ b/integrations/terraform-mwi/go.mod @@ -84,8 +84,8 @@ require ( github.com/armon/go-radix v1.0.0 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/aws/aws-sdk-go v1.55.8 // indirect - github.com/aws/aws-sdk-go-v2 v1.39.5 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect + github.com/aws/aws-sdk-go-v2 v1.39.6 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect github.com/aws/aws-sdk-go-v2/config v1.31.16 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.18.20 // indirect github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.20 // indirect @@ -93,13 +93,14 @@ require ( github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 // indirect github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.12 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 // indirect github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.41.1 // indirect github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 // indirect github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1 // indirect + github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 // indirect github.com/aws/aws-sdk-go-v2/service/dax v1.29.4 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.32.1 // indirect diff --git a/integrations/terraform-mwi/go.sum b/integrations/terraform-mwi/go.sum index a60d1b48c9f64..95b7380071fd3 100644 --- a/integrations/terraform-mwi/go.sum +++ b/integrations/terraform-mwi/go.sum @@ -796,10 +796,10 @@ github.com/aws/aws-sdk-go v1.49.12/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3Tj github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ= github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk= github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= -github.com/aws/aws-sdk-go-v2 v1.39.5 h1:e/SXuia3rkFtapghJROrydtQpfQaaUgd1cUvyO1mp2w= -github.com/aws/aws-sdk-go-v2 v1.39.5/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko= +github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk= +github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y= github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4= github.com/aws/aws-sdk-go-v2/config v1.31.16 h1:E4Tz+tJiPc7kGnXwIfCyUj6xHJNpENlY11oKpRTgsjc= github.com/aws/aws-sdk-go-v2/config v1.31.16/go.mod h1:2S9hBElpCyGMifv14WxQ7EfPumgoeCPZUpuPX8VtW34= @@ -818,11 +818,11 @@ github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.12/go.mod h1:T/o6k3LG7Ew45+Jz github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 h1:9/HxDeIgA7DcKK6e6ZaP5PQiXugYbNERx3Z5u30mN+k= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2/go.mod h1:3N1RoxKNcVHmbOKVMMw8pvMs5TUhGYPQP/aq1zmAWqo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 h1:p/9flfXdoAnwJnuW9xHEAFY22R3A6skYkW19JFF9F+8= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12/go.mod h1:ZTLHakoVCTtW8AaLGSwJ3LXqHD9uQKnOcv1TrpO6u2k= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 h1:2lTWFvRcnWFFLzHWmtddu5MTchc5Oj2OOey++99tPZ0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12/go.mod h1:hI92pK+ho8HVcWMHKHrK3Uml4pfG7wvL86FzO0LVtQQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= @@ -834,6 +834,8 @@ github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 h1:w50cPLPIyWSzh4bqgA/h0nzRw github.com/aws/aws-sdk-go-v2/service/athena v1.55.9/go.mod h1:jTVF/+wNGjLD94jaJxDqhWexDeH7r4zZkQ7bbboAf1I= github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1 h1:F/ZU3z+tNCIDhUD8wFEalX1GMdtU0SQlIXXi/hPFFpE= github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1/go.mod h1:PfutSAwCVczCH5sBPjuPc1pkjaSokL4DsJNlrLC3kww= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 h1:Yj4NvoEEdSxA90x/uCBskzeF3OxZr72Yaf64n0fIVR4= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7/go.mod h1:9/Q0/HtqBTLMksFse42wZjUq0jJrUuo4XlnXy/uSoeg= github.com/aws/aws-sdk-go-v2/service/dax v1.29.4 h1:IBtJf1olfwspLm3d20sDFstQTlHgL+dA2NtdXQMDius= github.com/aws/aws-sdk-go-v2/service/dax v1.29.4/go.mod h1:XciMFO7HCGFzOlvNl42HsWNakjQtt/JHzW26VzlahGI= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3 h1:28+obyib2FhFKASJ6qSPbuteiy0nvvcvfItdAAYure0= diff --git a/integrations/terraform/go.mod b/integrations/terraform/go.mod index 92e8daf896431..12f267c24ff96 100644 --- a/integrations/terraform/go.mod +++ b/integrations/terraform/go.mod @@ -88,8 +88,8 @@ require ( github.com/armon/go-radix v1.0.0 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/aws/aws-sdk-go v1.55.8 // indirect - github.com/aws/aws-sdk-go-v2 v1.39.5 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect + github.com/aws/aws-sdk-go-v2 v1.39.6 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect github.com/aws/aws-sdk-go-v2/config v1.31.16 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.18.20 // indirect github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.20 // indirect @@ -97,13 +97,14 @@ require ( github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 // indirect github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.12 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 // indirect github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.41.1 // indirect github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 // indirect github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1 // indirect + github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 // indirect github.com/aws/aws-sdk-go-v2/service/dax v1.29.4 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.32.1 // indirect diff --git a/integrations/terraform/go.sum b/integrations/terraform/go.sum index b0d666a3f86fe..b760f24919b37 100644 --- a/integrations/terraform/go.sum +++ b/integrations/terraform/go.sum @@ -816,10 +816,10 @@ github.com/aws/aws-sdk-go v1.49.12/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3Tj github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ= github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk= github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= -github.com/aws/aws-sdk-go-v2 v1.39.5 h1:e/SXuia3rkFtapghJROrydtQpfQaaUgd1cUvyO1mp2w= -github.com/aws/aws-sdk-go-v2 v1.39.5/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko= +github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk= +github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y= github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4= github.com/aws/aws-sdk-go-v2/config v1.31.16 h1:E4Tz+tJiPc7kGnXwIfCyUj6xHJNpENlY11oKpRTgsjc= github.com/aws/aws-sdk-go-v2/config v1.31.16/go.mod h1:2S9hBElpCyGMifv14WxQ7EfPumgoeCPZUpuPX8VtW34= @@ -838,11 +838,11 @@ github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.12/go.mod h1:T/o6k3LG7Ew45+Jz github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 h1:9/HxDeIgA7DcKK6e6ZaP5PQiXugYbNERx3Z5u30mN+k= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2/go.mod h1:3N1RoxKNcVHmbOKVMMw8pvMs5TUhGYPQP/aq1zmAWqo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 h1:p/9flfXdoAnwJnuW9xHEAFY22R3A6skYkW19JFF9F+8= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12/go.mod h1:ZTLHakoVCTtW8AaLGSwJ3LXqHD9uQKnOcv1TrpO6u2k= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 h1:2lTWFvRcnWFFLzHWmtddu5MTchc5Oj2OOey++99tPZ0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12/go.mod h1:hI92pK+ho8HVcWMHKHrK3Uml4pfG7wvL86FzO0LVtQQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= @@ -854,6 +854,8 @@ github.com/aws/aws-sdk-go-v2/service/athena v1.55.9 h1:w50cPLPIyWSzh4bqgA/h0nzRw github.com/aws/aws-sdk-go-v2/service/athena v1.55.9/go.mod h1:jTVF/+wNGjLD94jaJxDqhWexDeH7r4zZkQ7bbboAf1I= github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1 h1:F/ZU3z+tNCIDhUD8wFEalX1GMdtU0SQlIXXi/hPFFpE= github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.42.1/go.mod h1:PfutSAwCVczCH5sBPjuPc1pkjaSokL4DsJNlrLC3kww= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 h1:Yj4NvoEEdSxA90x/uCBskzeF3OxZr72Yaf64n0fIVR4= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7/go.mod h1:9/Q0/HtqBTLMksFse42wZjUq0jJrUuo4XlnXy/uSoeg= github.com/aws/aws-sdk-go-v2/service/dax v1.29.4 h1:IBtJf1olfwspLm3d20sDFstQTlHgL+dA2NtdXQMDius= github.com/aws/aws-sdk-go-v2/service/dax v1.29.4/go.mod h1:XciMFO7HCGFzOlvNl42HsWNakjQtt/JHzW26VzlahGI= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.52.3 h1:28+obyib2FhFKASJ6qSPbuteiy0nvvcvfItdAAYure0= From 53da2e5239feeca388df5461bdaf3ca0da2684fb Mon Sep 17 00:00:00 2001 From: Cam Hutchison Date: Mon, 10 Nov 2025 19:22:29 +1100 Subject: [PATCH 4/5] discovery: Refactor eks audit log fetching for testing Refactor the eksAuditLog{Watcher,Fetcher} and the aws_sync.Fetcher cloudwatchlogs to be more testable: * factor away eksAuditLogFetcher from eksAuditLogWatcher. The watcher just needs a factory function to create a fetcher, and all the watcher needs from that fetcher is a `Run()` method. Lift the cancel func out of the watcher and store it directly in the watcher, as only the watcher uses it. * factor away aws_sync.Fetcher from eksAuditLogFetcher. All it needs from the sync fetcher it calls is one method to fetch cloudwatch logs. Make that an interface and use just that. This allows a fake source of cloudwatch logs to be provided for testing. While here, use protobuf getters rather than accessing fields directly. * Use protobuf getters in aws_sync.Fetcher cloudwatchlogs instead of accessing fields directly. In future, we could pass in an interface with those getters to make the code more testable. --- lib/srv/discovery/eks_audit_log_fetcher.go | 31 +++++++++++-- lib/srv/discovery/eks_audit_log_watcher.go | 46 +++++++++++++------ .../fetchers/aws-sync/cloudwatchlogs.go | 12 ++--- 3 files changed, 64 insertions(+), 25 deletions(-) diff --git a/lib/srv/discovery/eks_audit_log_fetcher.go b/lib/srv/discovery/eks_audit_log_fetcher.go index 017d140a74762..83a8f778d6a61 100644 --- a/lib/srv/discovery/eks_audit_log_fetcher.go +++ b/lib/srv/discovery/eks_audit_log_fetcher.go @@ -31,19 +31,42 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha" - aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync" ) +// cloudwatchlogFetcher fetches cloudwatch logs for a given cluster, starting +// at the given cursur position. This interface exists so tests can plug in a +// fake fetcher and not need to stub out deeper AWS interfaces. +type cloudwatchlogFetcher interface { + FetchEKSAuditLogs( + ctx context.Context, + cluster *accessgraphv1alpha.AWSEKSClusterV1, + cursor *accessgraphv1alpha.KubeAuditLogCursor, + ) ([]cwltypes.FilteredLogEvent, error) +} + // eksAuditLogFetcher is a fetcher for EKS audit logs for a single cluster, // fetching the logs from AWS Cloud Watch Logs. It uses the grpc stream // to initiate the stream and possibly receive a resume state used to // synchronize the start point with a previous run fetching the logs. type eksAuditLogFetcher struct { - fetcher *aws_sync.Fetcher + fetcher cloudwatchlogFetcher cluster *accessgraphv1alpha.AWSEKSClusterV1 stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient log *slog.Logger - cancel context.CancelFunc +} + +func newEKSAuditLogFetcher( + fetcher cloudwatchlogFetcher, + cluster *accessgraphv1alpha.AWSEKSClusterV1, + stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient, + log *slog.Logger, +) *eksAuditLogFetcher { + return &eksAuditLogFetcher{ + fetcher: fetcher, + cluster: cluster, + stream: stream, + log: log, + } } // Run continuously polls AWS Cloud Watch Logs for Kubernetes apiserver @@ -79,7 +102,7 @@ func (f *eksAuditLogFetcher) Run(ctx context.Context) error { } f.log.DebugContext(ctx, "Sent KubeAuditLogEvents", "count", len(events), - "cursor_time", cursor.LastEventTime.AsTime()) + "cursor_time", cursor.GetLastEventTime().AsTime()) } return trace.Wrap(ctx.Err()) } diff --git a/lib/srv/discovery/eks_audit_log_watcher.go b/lib/srv/discovery/eks_audit_log_watcher.go index 3b4059b48ec39..55e845a48baf3 100644 --- a/lib/srv/discovery/eks_audit_log_watcher.go +++ b/lib/srv/discovery/eks_audit_log_watcher.go @@ -54,6 +54,10 @@ type eksAuditLogCluster struct { cluster *accessgraphv1alpha.AWSEKSClusterV1 } +type eksAuditLogFetcherRunner interface { + Run(context.Context) error +} + // eksAuditLogWatcher is a watcher that waits for notifications on a channel // indicating what EKS clusters should have audit logs fetched, and reconciles // that against what is currently being fetched. Fetchers are started and @@ -63,8 +67,20 @@ type eksAuditLogWatcher struct { log *slog.Logger auditLogClustersCh chan []eksAuditLogCluster - fetchers map[string]*eksAuditLogFetcher + // Fetchers tracks the cluster IDs (ARN) of the clusters for which we + // have a fetcher running. The value is a CancelFunc that is called to + // stop the fetcher. + fetchers map[string]context.CancelFunc completedCh chan fetcherCompleted + + // newFetcher is a function used to construct a new fetcher. It exists + // so tests can override it to not create real fetchers. + newFetcher func( + *aws_sync.Fetcher, + *accessgraphv1alpha.AWSEKSClusterV1, + accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient, + *slog.Logger, + ) eksAuditLogFetcherRunner } func newEKSAuditLogWatcher( @@ -75,15 +91,15 @@ func newEKSAuditLogWatcher( client: client, log: logger, auditLogClustersCh: make(chan []eksAuditLogCluster), - fetchers: make(map[string]*eksAuditLogFetcher), + fetchers: make(map[string]context.CancelFunc), completedCh: make(chan fetcherCompleted), } } // fetcherCompleted captures the result of a completed eksAuditLogFetcher. type fetcherCompleted struct { - fetcher *eksAuditLogFetcher - err error + clusterID string + err error } // Run starts a watcher by creating a KubeAuditLogStream on its grpc client. It @@ -168,27 +184,27 @@ func (w *eksAuditLogWatcher) reconcile(ctx context.Context, clusters []eksAuditL discoveredClusters[discovered.cluster.Arn] = discovered } // Stop any fetchers for clusters we are running fetcher for that discovery did not return. - for arn, existing := range mapDifference(w.fetchers, discoveredClusters) { + for arn, fetcherCancel := range mapDifference(w.fetchers, discoveredClusters) { w.log.InfoContext(ctx, "Stopping eksKubeAuditLogFetcher", "cluster", arn) - existing.cancel() + fetcherCancel() // cleanup will happen when the fetcher finishes and is put on the completed channel. } // Start any new fetchers for clusters we are not running that discovery returned. for arn, discovered := range mapDifference(discoveredClusters, w.fetchers) { w.log.InfoContext(ctx, "Starting eksKubeAuditLogFetcher", "cluster", arn) ctx, cancel := context.WithCancel(ctx) - logFetcher := &eksAuditLogFetcher{ - fetcher: discovered.fetcher, - cluster: discovered.cluster, - stream: stream, - log: w.log, - cancel: cancel, + var logFetcher eksAuditLogFetcherRunner + if w.newFetcher == nil { + logFetcher = newEKSAuditLogFetcher(discovered.fetcher, discovered.cluster, stream, w.log) + } else { + // the pluggable newFetcher is for testing purposes + logFetcher = w.newFetcher(discovered.fetcher, discovered.cluster, stream, w.log) } - w.fetchers[arn] = logFetcher + w.fetchers[arn] = cancel go func() { err := logFetcher.Run(ctx) select { - case w.completedCh <- fetcherCompleted{logFetcher, err}: + case w.completedCh <- fetcherCompleted{arn, err}: case <-ctx.Done(): } }() @@ -198,7 +214,7 @@ func (w *eksAuditLogWatcher) reconcile(ctx context.Context, clusters []eksAuditL // complete cleans up the maintained list of running log fetchers, removing the // given completed fetcher, and logs the completion status of the fetcher. func (w *eksAuditLogWatcher) complete(ctx context.Context, completed fetcherCompleted) { - arn := completed.fetcher.cluster.Arn + arn := completed.clusterID if completed.err != nil && !errors.Is(completed.err, context.Canceled) { w.log.ErrorContext(ctx, "eksKubeAuditLogFetcher completed with error", "cluster", arn, "error", completed.err) } else { diff --git a/lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go b/lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go index 6800b75b2de2e..f2fa18c975a1c 100644 --- a/lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go +++ b/lib/srv/discovery/fetchers/aws-sync/cloudwatchlogs.go @@ -33,7 +33,7 @@ import ( // FetchEKSAuditLogs returns a slice of audit log events for the given cluster // starting from the given cursor. func (a *Fetcher) FetchEKSAuditLogs(ctx context.Context, cluster *accessgraphv1alpha.AWSEKSClusterV1, cursor *accessgraphv1alpha.KubeAuditLogCursor) ([]cwltypes.FilteredLogEvent, error) { - cfg, err := a.AWSConfigProvider.GetConfig(ctx, cluster.Region, a.getAWSOptions()...) + cfg, err := a.AWSConfigProvider.GetConfig(ctx, cluster.GetRegion(), a.getAWSOptions()...) if err != nil { return nil, trace.Wrap(err) } @@ -42,9 +42,9 @@ func (a *Fetcher) FetchEKSAuditLogs(ctx context.Context, cluster *accessgraphv1a // limit is not a hard limit - we may exceed it but won't get any more pages // once reached. var limit int32 = 500 // TODO(camscale): Consider making this a parameter - startTime := cursor.LastEventTime.AsTime().UTC() + startTime := cursor.GetLastEventTime().AsTime().UTC() input := &cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: aws.String("/aws/eks/" + cluster.Name + "/cluster"), + LogGroupName: aws.String("/aws/eks/" + cluster.GetName() + "/cluster"), LogStreamNamePrefix: aws.String("kube-apiserver-audit-"), StartTime: aws.Int64(startTime.UnixMilli()), Limit: aws.Int32(limit), @@ -80,18 +80,18 @@ func (a *Fetcher) FetchEKSAuditLogs(ctx context.Context, cluster *accessgraphv1a // If the cursor is nil, the events are returned unfiltered. func cwlEventsAfterCursor(events []cwltypes.FilteredLogEvent, cursor *accessgraphv1alpha.KubeAuditLogCursor) []cwltypes.FilteredLogEvent { // If we're not looking for events from a cursor position, just return all events. - if cursor == nil || cursor.EventId == "" { + if cursor == nil || cursor.GetEventId() == "" { return events } - startTime := cursor.LastEventTime.AsTime().UTC() + startTime := cursor.GetLastEventTime().AsTime().UTC() for i, event := range events { // If we never saw cursor.EventId with the given timestamp, // just return all the events. if time.UnixMilli(*event.Timestamp).UTC().After(startTime) { return events } - if *event.EventId == cursor.EventId { + if *event.EventId == cursor.GetEventId() { return events[i+1:] } } From a2f99a222690863d008bf511191adf110d085c76 Mon Sep 17 00:00:00 2001 From: Cam Hutchison Date: Mon, 10 Nov 2025 19:27:55 +1100 Subject: [PATCH 5/5] discovery: Add eks audit log tests Add tests for `eksAuditLogWatcher` and `eksAuditLogFetcher`. Copy the grpc stream testing util from the access graph repo into teleport as it is useful for the bidirectional streaming methods uses by access graph, and makes it easier to test on the client side. --- .../discovery/eks_audit_log_fetcher_test.go | 245 ++++++++++++++++++ .../discovery/eks_audit_log_watcher_test.go | 210 +++++++++++++++ lib/utils/testutils/grpctest/grpc.go | 146 +++++++++++ 3 files changed, 601 insertions(+) create mode 100644 lib/srv/discovery/eks_audit_log_fetcher_test.go create mode 100644 lib/srv/discovery/eks_audit_log_watcher_test.go create mode 100644 lib/utils/testutils/grpctest/grpc.go diff --git a/lib/srv/discovery/eks_audit_log_fetcher_test.go b/lib/srv/discovery/eks_audit_log_fetcher_test.go new file mode 100644 index 0000000000000..bbf792daf4f8f --- /dev/null +++ b/lib/srv/discovery/eks_audit_log_fetcher_test.go @@ -0,0 +1,245 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package discovery + +import ( + "context" + "errors" + "fmt" + "log/slog" + "testing" + "testing/synctest" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + cwltypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" + "github.com/stretchr/testify/require" + + accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha" + "github.com/gravitational/teleport/lib/utils/testutils/grpctest" +) + +type eksAuditLogFetcherFixture struct { + ctx context.Context + cancel context.CancelFunc + server kalsServer + fetcherErr error + cluster *accessgraphv1alpha.AWSEKSClusterV1 + fakeLogFetcher *fakeCloudWatchLogFetcher +} + +// Start the fixture. Must be called inside synctest bubble. +func (f *eksAuditLogFetcherFixture) Start(t *testing.T) { + t.Helper() + + f.ctx, f.cancel = context.WithCancel(t.Context()) + tester := grpctest.NewGRPCTester[kalsRequest, kalsResponse](f.ctx) + f.server = tester.NewServerStream() + logger := slog.New(slog.DiscardHandler) + f.fakeLogFetcher = newFakeCloudWatchLogFetcher() + f.cluster = &accessgraphv1alpha.AWSEKSClusterV1{ + Name: "cluster-name", + Arn: "cluster-arn", + } + logFetcher := newEKSAuditLogFetcher(f.fakeLogFetcher, f.cluster, tester.NewClientStream(), logger) + go func() { f.fetcherErr = logFetcher.Run(f.ctx) }() +} + +// End the fixture. Must be called inside synctest bubble. +func (f *eksAuditLogFetcherFixture) End(t *testing.T) { + t.Helper() + f.cancel() + synctest.Wait() + require.ErrorIs(t, f.fetcherErr, context.Canceled) +} + +func (f *eksAuditLogFetcherFixture) testInitializeNewStream(t *testing.T) { + t.Helper() + + // Wait for a NewStream action, and verify it contains what we expect + msg, err := f.server.Recv() + require.NoError(t, err) + newStream := msg.GetNewStream() + require.NotNil(t, newStream) + cursor := newStream.GetInitial() + require.NotNil(t, cursor) + require.Equal(t, accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS, cursor.GetLogSource()) + require.Equal(t, f.cluster.GetArn(), cursor.GetClusterId()) + + // Send back a ResumeState + err = f.server.Send(newKubeAuditLogResponseResumeState(cursor)) + require.NoError(t, err) +} + +// TestEKSAuditLogFetcher_NewStream_Unknown tests that when a new log stream +// is set up for a cluster, logs start being fetched from the cursor returned +// by the grpc service. +func TestEKSAuditLogFetcher_NewStream(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + f := &eksAuditLogFetcherFixture{} + f.Start(t) + f.testInitializeNewStream(t) + f.End(t) + }) +} + +func TestEKSAuditLogFetcher_Batching(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + startTime := time.Now().UTC() + logEpoch := startTime.Add(-7 * 24 * time.Hour) + f := &eksAuditLogFetcherFixture{} + f.Start(t) + f.testInitializeNewStream(t) + + f.fakeLogFetcher.events <- nil + // Wait for a polling loop to occur. As there are no logs left, + // the time should now be the synctest epoch plus the poll interval + time.Sleep(logPollInterval) + synctest.Wait() + require.Equal(t, startTime.Add(logPollInterval), time.Now().UTC()) + + // Wait for an Events action with the log listed. Verify the log and cursor. + f.fakeLogFetcher.events <- []cwltypes.FilteredLogEvent{ + makeEvent(logEpoch, 0, "{}"), + makeEvent(logEpoch.Add(time.Second), 1, `{"log": "value"}`), + } + msg, err := f.server.Recv() + require.NoError(t, err) + events := msg.GetEvents() + require.NotNil(t, events) + require.Len(t, events.GetEvents(), 2) + require.Empty(t, events.GetEvents()[0].GetFields()) + require.Len(t, events.GetEvents()[1].GetFields(), 1) + cursor := events.GetCursor() + require.NotNil(t, cursor) + require.Equal(t, accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS, cursor.GetLogSource()) + require.Equal(t, f.cluster.GetArn(), cursor.GetClusterId()) + require.Equal(t, "event-id-1", cursor.GetEventId()) + require.Equal(t, logEpoch.Add(time.Second), cursor.GetLastEventTime().AsTime()) + + f.fakeLogFetcher.events <- []cwltypes.FilteredLogEvent{ + makeEvent(logEpoch.Add(time.Second), 2, `{"log": "value2"}`), + makeEvent(logEpoch.Add(2*time.Second), 3, `{}`), + } + msg, err = f.server.Recv() + require.NoError(t, err) + events = msg.GetEvents() + require.NotNil(t, events) + require.Len(t, events.GetEvents(), 2) + require.Len(t, events.GetEvents()[0].GetFields(), 1) + require.Empty(t, events.GetEvents()[1].GetFields()) + cursor = events.GetCursor() + require.NotNil(t, cursor) + require.Equal(t, accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS, cursor.GetLogSource()) + require.Equal(t, f.cluster.GetArn(), cursor.GetClusterId()) + require.Equal(t, "event-id-3", cursor.GetEventId()) + require.Equal(t, logEpoch.Add(2*time.Second), cursor.GetLastEventTime().AsTime()) + + f.End(t) + }) +} + +func TestEKSAuditLogFetcher_ContinueOnError(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + startTime := time.Now().UTC() + logEpoch := startTime.Add(-7 * 24 * time.Hour) + f := &eksAuditLogFetcherFixture{} + f.Start(t) + f.testInitializeNewStream(t) + + f.fakeLogFetcher.err <- errors.New("oh noes. something went wrong") + // Wait for a polling loop to occur. As there are no logs left, + // the time should now be the synctest epoch plus the poll interval + time.Sleep(logPollInterval) + synctest.Wait() + require.Equal(t, startTime.Add(logPollInterval), time.Now().UTC()) + + // Wait for an Events action with the log listed. Verify the log and cursor. + f.fakeLogFetcher.events <- []cwltypes.FilteredLogEvent{ + makeEvent(logEpoch, 0, "{}"), + makeEvent(logEpoch.Add(time.Second), 1, `{"log": "value"}`), + } + msg, err := f.server.Recv() + require.NoError(t, err) + events := msg.GetEvents() + require.NotNil(t, events) + require.Len(t, events.GetEvents(), 2) + require.Empty(t, events.GetEvents()[0].GetFields()) + require.Len(t, events.GetEvents()[1].GetFields(), 1) + cursor := events.GetCursor() + require.NotNil(t, cursor) + require.Equal(t, accessgraphv1alpha.KubeAuditLogCursor_KUBE_AUDIT_LOG_SOURCE_EKS, cursor.GetLogSource()) + require.Equal(t, f.cluster.GetArn(), cursor.GetClusterId()) + require.Equal(t, "event-id-1", cursor.GetEventId()) + require.Equal(t, logEpoch.Add(time.Second), cursor.GetLastEventTime().AsTime()) + + f.End(t) + }) +} + +func newKubeAuditLogResponseResumeState(cursor *accessgraphv1alpha.KubeAuditLogCursor) *kalsResponse { + return &kalsResponse{ + State: &accessgraphv1alpha.KubeAuditLogStreamResponse_ResumeState{ + ResumeState: &accessgraphv1alpha.KubeAuditLogResumeState{ + Cursor: cursor, + }, + }, + } +} + +func makeEvent(t time.Time, id int, msg string) cwltypes.FilteredLogEvent { + return cwltypes.FilteredLogEvent{ + EventId: aws.String(fmt.Sprintf("event-id-%d", id)), + IngestionTime: aws.Int64(t.UnixMilli()), + Timestamp: aws.Int64(t.UnixMilli()), + LogStreamName: aws.String("kube-apiserver-audit-12345678"), + Message: aws.String(msg), + } +} + +func newFakeCloudWatchLogFetcher() *fakeCloudWatchLogFetcher { + return &fakeCloudWatchLogFetcher{ + events: make(chan []cwltypes.FilteredLogEvent), + err: make(chan error), + } +} + +// fakeCloudWatchLogFetcher is a cloudwatch log fetcher that waits on channels +// for the data to return. This allows the unit under test to rendezvous with +// the tests, allowing the tests to advance the state of the fetcher as it +// needs. +type fakeCloudWatchLogFetcher struct { + events chan []cwltypes.FilteredLogEvent + err chan error +} + +func (f *fakeCloudWatchLogFetcher) FetchEKSAuditLogs( + ctx context.Context, + cluster *accessgraphv1alpha.AWSEKSClusterV1, + cursor *accessgraphv1alpha.KubeAuditLogCursor, +) ([]cwltypes.FilteredLogEvent, error) { + select { + case events := <-f.events: + return events, nil + case err := <-f.err: + return nil, err + case <-ctx.Done(): + return nil, ctx.Err() + } +} diff --git a/lib/srv/discovery/eks_audit_log_watcher_test.go b/lib/srv/discovery/eks_audit_log_watcher_test.go new file mode 100644 index 0000000000000..4d3aa5719641c --- /dev/null +++ b/lib/srv/discovery/eks_audit_log_watcher_test.go @@ -0,0 +1,210 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package discovery + +import ( + "context" + "log/slog" + "testing" + "testing/synctest" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha" + aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync" + "github.com/gravitational/teleport/lib/utils/testutils/grpctest" +) + +type ( + kalsRequest = accessgraphv1alpha.KubeAuditLogStreamRequest + kalsResponse = accessgraphv1alpha.KubeAuditLogStreamResponse + kalsClient = grpc.BidiStreamingClient[kalsRequest, kalsResponse] + kalsServer = grpc.BidiStreamingServer[kalsRequest, kalsResponse] +) + +func TestEKSAuditLogWatcher_Init(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + + client := newFakeKubeAuditLogClient(ctx) + watcher := newEKSAuditLogWatcher(client, slog.New(slog.DiscardHandler)) + var err error + go func() { err = watcher.Run(ctx) }() + + // Receive a config request. + req, err := client.serverStream.Recv() + require.NoError(t, err) + require.NotNil(t, req.GetConfig()) + + // Send back a config response with the config we received. + err = client.serverStream.Send(newKubeAuditLogResponseConfig(req.GetConfig())) + require.NoError(t, err) + + cancel() + synctest.Wait() + require.ErrorIs(t, err, context.Canceled) + }) +} + +func TestEKSAuditLogWatcher_Reconcile(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + + fetcherTracker := newFakeFetcherTracker() + client := newFakeKubeAuditLogClient(ctx) + watcher := newEKSAuditLogWatcher(client, slog.New(slog.DiscardHandler)) + watcher.newFetcher = fetcherTracker.newFetcher + var err error + go func() { err = watcher.Run(ctx) }() + + // Receive a config request. + req, err := client.serverStream.Recv() + require.NoError(t, err) + require.NotNil(t, req.GetConfig()) + + // Send back a config response with the config we received. + err = client.serverStream.Send(newKubeAuditLogResponseConfig(req.GetConfig())) + require.NoError(t, err) + + // Send a single cluster1 to the watcher to reconcile + cluster1 := &accessgraphv1alpha.AWSEKSClusterV1{Arn: "test-arn"} + fetcher1 := &aws_sync.Fetcher{} + watcher.Reconcile(ctx, []eksAuditLogCluster{{fetcher1, cluster1}}) + synctest.Wait() + + // Verify that a fetcher was started. + f1, ok := fetcherTracker.fetchers["test-arn"] + require.True(t, ok, "eksAuditLogFetcher not in fetcherTracker") + require.True(t, f1.runCalled, "fetcher Run() was not called") + require.Same(t, fetcher1, f1.fetcher) + require.Same(t, cluster1, f1.cluster) + require.Len(t, fetcherTracker.fetchers, 1) + require.Equal(t, 1, fetcherTracker.newCount) + + // Add another cluster + cluster2 := &accessgraphv1alpha.AWSEKSClusterV1{Arn: "test-arn2"} + fetcher2 := &aws_sync.Fetcher{} + watcher.Reconcile(ctx, []eksAuditLogCluster{{fetcher1, cluster1}, {fetcher2, cluster2}}) + synctest.Wait() + + // Verify that a fetcher was started. + f2, ok := fetcherTracker.fetchers["test-arn2"] + require.True(t, ok, "eksAuditLogFetcher not in fetcherTracker") + require.True(t, f2.runCalled, "fetcher Run() was not called") + require.Same(t, fetcher2, f2.fetcher) + require.Same(t, cluster2, f2.cluster) + require.Len(t, fetcherTracker.fetchers, 2) + require.Equal(t, 2, fetcherTracker.newCount) + + // Drop back to a single cluster + watcher.Reconcile(ctx, []eksAuditLogCluster{{fetcher1, cluster1}}) + synctest.Wait() + require.Len(t, fetcherTracker.fetchers, 1) + require.Equal(t, 2, fetcherTracker.newCount) + require.True(t, f2.done) + + // Send an empty cluster list. Should stop last fetcher + watcher.Reconcile(ctx, []eksAuditLogCluster{}) + synctest.Wait() + require.Empty(t, fetcherTracker.fetchers) + require.Equal(t, 2, fetcherTracker.newCount) + require.True(t, f1.done) + + cancel() + synctest.Wait() + require.ErrorIs(t, err, context.Canceled) + }) +} + +// fakeFetcherTracker keeps track of the fetchers created by an +// eksAuditLogWatcher. It has a newFetcher method that can plug into a watcher +// so that real fetchers are not created, and returns a fake fetcher for +// testing purposes. +type fakeFetcherTracker struct { + fetchers map[string]*fakeEksAuditLogFetcher + newCount int +} + +func newFakeFetcherTracker() *fakeFetcherTracker { + return &fakeFetcherTracker{fetchers: make(map[string]*fakeEksAuditLogFetcher)} +} + +// newFetcher plugs into eksAuditLogWatcher.newFetcher so that it creates fake +// fetchers for testing purposes. We do not need real fetchers to test the +// watcher. +func (fft *fakeFetcherTracker) newFetcher( + fetcher *aws_sync.Fetcher, + cluster *accessgraphv1alpha.AWSEKSClusterV1, + stream accessgraphv1alpha.AccessGraphService_KubeAuditLogStreamClient, + log *slog.Logger, +) eksAuditLogFetcherRunner { + f := &fakeEksAuditLogFetcher{ + fetcher: fetcher, + cluster: cluster, + cleanup: func() { delete(fft.fetchers, cluster.Arn) }, + } + fft.fetchers[cluster.Arn] = f + fft.newCount++ + return f +} + +type fakeEksAuditLogFetcher struct { + fetcher *aws_sync.Fetcher + cluster *accessgraphv1alpha.AWSEKSClusterV1 + cleanup func() + runCalled bool + done bool +} + +func (f *fakeEksAuditLogFetcher) Run(ctx context.Context) error { + f.runCalled = true // used in synctest bubble, no race + <-ctx.Done() + f.done = true + f.cleanup() + return ctx.Err() +} + +func newKubeAuditLogResponseConfig(cfg *accessgraphv1alpha.KubeAuditLogConfig) *kalsResponse { + return &kalsResponse{ + State: &accessgraphv1alpha.KubeAuditLogStreamResponse_Config{ + Config: cfg, + }, + } +} + +func newFakeKubeAuditLogClient(ctx context.Context) *fakeKubeAuditLogClient { + tester := grpctest.NewGRPCTester[kalsRequest, kalsResponse](ctx) + return &fakeKubeAuditLogClient{ + clientStream: tester.NewClientStream(), + serverStream: tester.NewServerStream(), + } +} + +type fakeKubeAuditLogClient struct { + accessgraphv1alpha.AccessGraphServiceClient + + clientStream kalsClient + serverStream kalsServer +} + +// Implements KubeAuditLogStream grpc method on the client +func (c *fakeKubeAuditLogClient) KubeAuditLogStream(ctx context.Context, opts ...grpc.CallOption) (kalsClient, error) { + return c.clientStream, nil +} diff --git a/lib/utils/testutils/grpctest/grpc.go b/lib/utils/testutils/grpctest/grpc.go new file mode 100644 index 0000000000000..5664b2c7dc1d8 --- /dev/null +++ b/lib/utils/testutils/grpctest/grpc.go @@ -0,0 +1,146 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package grpctest + +import ( + "context" + "io" + + "google.golang.org/grpc" +) + +// NewGRPCStreams creates a new bidirectional streaming gRPC pair, a bidirectional +// streaming client and server, for the given request type T1 and response type +// T2. +// +// The streams are directly connected without the use of network and are +// therefore suitable to be used with synctest. +// +// The client sends its requests on the clientStream which are directly received +// by the server via a channel with buffer size 1. The server sends its +// responses on its serverStream via another channel with buffer size 1. +// +// Private fields are purposefully written in a *not* concurrency-safe manner to +// simulate non-concurrency safety of real over-the-network GRPC stream. It will +// be caught when executing the test with the race detector enbled. +func NewGRPCTester[T1, T2 any](ctx context.Context) *GRPCTester[T1, T2] { + return &GRPCTester[T1, T2]{ + ctx: ctx, + toServer: make(chan *T1, 1), + toClient: make(chan *T2, 1), + } +} + +type GRPCTester[T1, T2 any] struct { + ctx context.Context + toServer chan *T1 + toClient chan *T2 +} + +func (t *GRPCTester[T1, T2]) NewClientStream() grpc.BidiStreamingClient[T1, T2] { + return &client[T1, T2]{ + ctx: t.ctx, + toServer: t.toServer, + toClient: t.toClient, + } +} + +func (t *GRPCTester[T1, T2]) NewServerStream() grpc.BidiStreamingServer[T1, T2] { + return &server[T1, T2]{ + ctx: t.ctx, + toServer: t.toServer, + toClient: t.toClient, + } +} + +type client[T1, T2 any] struct { + grpc.ClientStream + ctx context.Context + toServer chan *T1 + toClient chan *T2 + // simulate non-concurrency safety + sendRaceDetector bool + receiveRaceDetector bool +} + +func (c *client[T1, T2]) Context() context.Context { + return c.ctx +} + +func (c *client[T1, T2]) Send(req *T1) error { + c.sendRaceDetector = true // simulate non-concurrency safety + select { + case c.toServer <- req: + return nil + case <-c.ctx.Done(): + return c.ctx.Err() + } +} + +func (c *client[T1, T2]) Recv() (*T2, error) { + c.receiveRaceDetector = true // simulate non-concurrency safety + select { + case resp := <-c.toClient: + return resp, nil + case <-c.ctx.Done(): + return nil, c.ctx.Err() + } +} + +func (c *client[T1, T2]) CloseSend() error { + close(c.toServer) + return nil +} + +type server[T1, T2 any] struct { + grpc.ServerStream + ctx context.Context + toServer chan *T1 + toClient chan *T2 + // simulate non-concurrency safety + sendRaceDetector bool + receiveRaceDetector bool +} + +func (s *server[T1, T2]) Context() context.Context { + return s.ctx +} + +func (s *server[T1, T2]) Send(resp *T2) error { + s.sendRaceDetector = true + select { + case s.toClient <- resp: + return nil + case <-s.ctx.Done(): + return s.ctx.Err() + } +} + +func (s *server[T1, T2]) Recv() (*T1, error) { + s.receiveRaceDetector = true + select { + case req, ok := <-s.toServer: + if !ok { + return nil, io.EOF + } + return req, nil + case <-s.ctx.Done(): + return nil, s.ctx.Err() + } +}