Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions lib/events/athena/athena.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/gravitational/teleport"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/api/types/externalcloudaudit"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/observability/metrics"
Expand Down Expand Up @@ -116,9 +117,22 @@ type Config struct {
UIDGenerator utils.UID
// LogEntry is a log entry.
LogEntry *log.Entry
// AWSConfig is AWS config which can be used to construct varius AWS Clients
// using aws-sdk-go-v2.
AWSConfig *aws.Config

// PublisherConsumerAWSConfig is an AWS config which can be used to
// construct AWS Clients using aws-sdk-go-v2, used by the publisher and
// consumer components which publish/consume events from SQS (and S3 for
// large events). These are always hosted on Teleport cloud infra even when
// External Cloud Audit is enabled, any events written here are only held
// temporarily while they are queued to write to s3 parquet files in
// batches.
PublisherConsumerAWSConfig *aws.Config

// StorerQuerierAWSConfig is an AWS config which can be used to construct AWS Clients
// using aws-sdk-go-v2, used by the consumer (store phase) and the querier.
// Often it is the same as PublisherConsumerAWSConfig unless External Cloud
// Audit is enabled, then this will authenticate and connect to
// external/customer AWS account.
StorerQuerierAWSConfig *aws.Config

Backend backend.Backend

Expand Down Expand Up @@ -246,7 +260,7 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error {
})
}

if cfg.AWSConfig == nil {
if cfg.PublisherConsumerAWSConfig == nil {
awsCfg, err := awsconfig.LoadDefaultConfig(ctx)
if err != nil {
return trace.Wrap(err)
Expand All @@ -257,7 +271,11 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error {
awsCfg.Region = cfg.Region
}
otelaws.AppendMiddlewares(&awsCfg.APIOptions)
cfg.AWSConfig = &awsCfg
cfg.PublisherConsumerAWSConfig = &awsCfg
}

if cfg.StorerQuerierAWSConfig == nil {
cfg.StorerQuerierAWSConfig = cfg.PublisherConsumerAWSConfig
}

if cfg.Backend == nil {
Expand Down Expand Up @@ -367,6 +385,26 @@ func (cfg *Config) SetFromURL(url *url.URL) error {
return nil
}

func (cfg *Config) UpdateForExternalCloudAudit(ctx context.Context, spec *externalcloudaudit.ExternalCloudAuditSpec, credentialsProvider aws.CredentialsProvider) error {
Comment thread
nklaassen marked this conversation as resolved.
cfg.LocationS3 = spec.AuditEventsLongTermURI
cfg.Workgroup = spec.AthenaWorkgroup
cfg.QueryResultsS3 = spec.AthenaResultsURI
cfg.Database = spec.GlueDatabase
cfg.TableName = spec.GlueTable

awsCfg, err := awsconfig.LoadDefaultConfig(ctx,
awsconfig.WithRegion(cfg.Region),
awsconfig.WithCredentialsProvider(credentialsProvider),
)
if err != nil {
return trace.Wrap(err)
}
otelaws.AppendMiddlewares(&awsCfg.APIOptions)
cfg.StorerQuerierAWSConfig = &awsCfg

return nil
}

// Log is an events storage backend.
//
// It's using SNS for emitting events.
Expand Down Expand Up @@ -402,7 +440,7 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
queryResultsS3: cfg.QueryResultsS3,
getQueryResultsInterval: cfg.GetQueryResultsInterval,
disableQueryCostOptimization: cfg.DisableSearchCostOptimization,
awsCfg: cfg.AWSConfig,
awsCfg: cfg.StorerQuerierAWSConfig,
logger: cfg.LogEntry,
clock: cfg.Clock,
tracer: cfg.Tracer,
Expand Down
77 changes: 40 additions & 37 deletions lib/events/athena/athena_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,16 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
backend.Backend
}

dummyAWSCfg := &aws.Config{}
validConfig := Config{
Database: "db",
TableName: "tbl",
TopicARN: "arn:topic",
LargeEventsS3: "s3://large-payloads-bucket",
LocationS3: "s3://events-bucket",
QueueURL: "https://queue-url",
AWSConfig: &aws.Config{},
Backend: mockBackend{},
Database: "db",
TableName: "tbl",
TopicARN: "arn:topic",
LargeEventsS3: "s3://large-payloads-bucket",
LocationS3: "s3://events-bucket",
QueueURL: "https://queue-url",
PublisherConsumerAWSConfig: dummyAWSCfg,
Backend: mockBackend{},
}
tests := []struct {
name string
Expand All @@ -170,19 +171,20 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
return validConfig
},
want: Config{
Database: "db",
TableName: "tbl",
TopicARN: "arn:topic",
LargeEventsS3: "s3://large-payloads-bucket",
largeEventsBucket: "large-payloads-bucket",
LocationS3: "s3://events-bucket",
locationS3Bucket: "events-bucket",
QueueURL: "https://queue-url",
GetQueryResultsInterval: 100 * time.Millisecond,
BatchMaxItems: 20000,
BatchMaxInterval: 1 * time.Minute,
AWSConfig: &aws.Config{},
Backend: mockBackend{},
Database: "db",
TableName: "tbl",
TopicARN: "arn:topic",
LargeEventsS3: "s3://large-payloads-bucket",
largeEventsBucket: "large-payloads-bucket",
LocationS3: "s3://events-bucket",
locationS3Bucket: "events-bucket",
QueueURL: "https://queue-url",
GetQueryResultsInterval: 100 * time.Millisecond,
BatchMaxItems: 20000,
BatchMaxInterval: 1 * time.Minute,
PublisherConsumerAWSConfig: dummyAWSCfg,
StorerQuerierAWSConfig: dummyAWSCfg,
Backend: mockBackend{},
},
},
{
Expand All @@ -194,22 +196,23 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
return cfg
},
want: Config{
Database: "db",
TableName: "tbl",
TopicARN: "arn:topic",
LargeEventsS3: "s3://large-payloads-bucket",
largeEventsBucket: "large-payloads-bucket",
LocationS3: "s3://events-bucket",
locationS3Bucket: "events-bucket",
QueueURL: "https://queue-url",
GetQueryResultsInterval: 100 * time.Millisecond,
BatchMaxItems: 20000,
BatchMaxInterval: 1 * time.Minute,
AWSConfig: &aws.Config{},
Backend: mockBackend{},
LimiterRefillTime: 1 * time.Second,
LimiterBurst: 10,
LimiterRefillAmount: 5,
Database: "db",
TableName: "tbl",
TopicARN: "arn:topic",
LargeEventsS3: "s3://large-payloads-bucket",
largeEventsBucket: "large-payloads-bucket",
LocationS3: "s3://events-bucket",
locationS3Bucket: "events-bucket",
QueueURL: "https://queue-url",
GetQueryResultsInterval: 100 * time.Millisecond,
BatchMaxItems: 20000,
BatchMaxInterval: 1 * time.Minute,
PublisherConsumerAWSConfig: dummyAWSCfg,
StorerQuerierAWSConfig: dummyAWSCfg,
Backend: mockBackend{},
LimiterRefillTime: 1 * time.Second,
LimiterBurst: 10,
LimiterRefillAmount: 5,
},
},
{
Expand Down
9 changes: 4 additions & 5 deletions lib/events/athena/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,13 @@ type s3downloader interface {
}

func newConsumer(cfg Config, cancelFn context.CancelFunc, metricConsumerBatchProcessingDuration prometheus.Histogram) (*consumer, error) {
s3client := s3.NewFromConfig(*cfg.AWSConfig)
sqsClient := sqs.NewFromConfig(*cfg.AWSConfig)
sqsClient := sqs.NewFromConfig(*cfg.PublisherConsumerAWSConfig)

collectCfg := sqsCollectConfig{
sqsReceiver: sqsClient,
queueURL: cfg.QueueURL,
// TODO(tobiaszheller): use s3 manager from teleport observability.
payloadDownloader: manager.NewDownloader(s3client),
// TODO(nklaassen): use s3 manager from teleport observability.
payloadDownloader: manager.NewDownloader(s3.NewFromConfig(*cfg.PublisherConsumerAWSConfig)),
payloadBucket: cfg.largeEventsBucket,
visibilityTimeout: int32(cfg.BatchMaxInterval.Seconds()),
batchMaxItems: cfg.BatchMaxItems,
Expand Down Expand Up @@ -140,7 +139,7 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc, metricConsumerBatchPro
queueURL: cfg.QueueURL,
perDateFileParquetWriter: func(ctx context.Context, date string) (io.WriteCloser, error) {
key := fmt.Sprintf("%s/%s/%s.parquet", cfg.locationS3Prefix, date, uuid.NewString())
fw, err := awsutils.NewS3V2FileWriter(ctx, s3client, cfg.locationS3Bucket, key, nil /* uploader options */, func(poi *s3.PutObjectInput) {
fw, err := awsutils.NewS3V2FileWriter(ctx, s3.NewFromConfig(*cfg.StorerQuerierAWSConfig), cfg.locationS3Bucket, key, nil /* uploader options */, func(poi *s3.PutObjectInput) {
// ChecksumAlgorithm is required for putting objects when object lock is enabled.
poi.ChecksumAlgorithm = s3Types.ChecksumAlgorithmSha256
})
Expand Down
4 changes: 2 additions & 2 deletions lib/events/athena/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ func newPublisherFromAthenaConfig(cfg Config) *publisher {
})
return NewPublisher(PublisherConfig{
TopicARN: cfg.TopicARN,
SNSPublisher: sns.NewFromConfig(*cfg.AWSConfig, func(o *sns.Options) {
SNSPublisher: sns.NewFromConfig(*cfg.PublisherConsumerAWSConfig, func(o *sns.Options) {
o.Retryer = r
}),
// TODO(tobiaszheller): consider reworking lib/observability to work also on s3 sdk-v2.
Uploader: manager.NewUploader(s3.NewFromConfig(*cfg.AWSConfig)),
Uploader: manager.NewUploader(s3.NewFromConfig(*cfg.PublisherConsumerAWSConfig)),
PayloadBucket: cfg.largeEventsBucket,
PayloadPrefix: cfg.largeEventsPrefix,
})
Expand Down
2 changes: 1 addition & 1 deletion lib/events/s3sessions/s3handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Config struct {
ACL string
// Session is an optional existing AWS client session
Session *awssession.Session
// Credentials if supplied are used in tests
// Credentials if supplied are used in tests or with external cloud audit.
Credentials *credentials.Credentials
// SSEKMSKey specifies the optional custom CMK used for KMS SSE.
SSEKMSKey string
Expand Down
2 changes: 1 addition & 1 deletion lib/integrations/externalcloudaudit/configurator.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func newConfigurator(ctx context.Context, spec *externalcloudaudit.ExternalCloud
// IsUsed returns a boolean indicating whether the ExternalCloudAudit feature is
// currently in active use.
func (c *Configurator) IsUsed() bool {
return c.isUsed
return c != nil && c.isUsed
}

// GetSpec returns the current active ExternalCloudAuditSpec.
Expand Down
68 changes: 61 additions & 7 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"sync/atomic"
"time"

awscredentials "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/google/uuid"
"github.com/gravitational/roundtrip"
"github.com/gravitational/trace"
Expand Down Expand Up @@ -101,6 +102,7 @@ import (
"github.com/gravitational/teleport/lib/events/pgevents"
"github.com/gravitational/teleport/lib/events/s3sessions"
"github.com/gravitational/teleport/lib/httplib"
"github.com/gravitational/teleport/lib/integrations/externalcloudaudit"
"github.com/gravitational/teleport/lib/inventory"
"github.com/gravitational/teleport/lib/joinserver"
kubegrpc "github.com/gravitational/teleport/lib/kube/grpc"
Expand Down Expand Up @@ -1380,7 +1382,7 @@ func adminCreds() (*int, *int, error) {
// When configured to store session recordings in external storage, this will be an API client for
// cloud-provider storage. Otherwise a local file-based handler is used which stores the recordings
// on disk.
func initAuthUploadHandler(ctx context.Context, auditConfig types.ClusterAuditConfig, dataDir string) (events.MultipartHandler, error) {
func initAuthUploadHandler(ctx context.Context, auditConfig types.ClusterAuditConfig, dataDir string, externalCloudAudit *externalcloudaudit.Configurator) (events.MultipartHandler, error) {
if !auditConfig.ShouldUploadSessions() {
recordsDir := filepath.Join(dataDir, events.RecordsDir)
if err := os.MkdirAll(recordsDir, teleport.SharedDirMode); err != nil {
Expand Down Expand Up @@ -1411,9 +1413,18 @@ func initAuthUploadHandler(ctx context.Context, auditConfig types.ClusterAuditCo
}
return handler, nil
case teleport.SchemeS3:
config := s3sessions.Config{UseFIPSEndpoint: auditConfig.GetUseFIPSEndpoint()}

if err := config.SetFromURL(uri, auditConfig.Region()); err != nil {
config := s3sessions.Config{
UseFIPSEndpoint: auditConfig.GetUseFIPSEndpoint(),
}
s3URI := uri
if externalCloudAudit.IsUsed() {
config.Credentials = awscredentials.NewCredentials(externalCloudAudit.CredentialsProviderSDKV1())
s3URI, err = apiutils.ParseSessionsURI(externalCloudAudit.GetSpec().SessionsRecordingsURI)
if err != nil {
return nil, trace.Wrap(err)
}
}
if err := config.SetFromURL(s3URI, auditConfig.Region()); err != nil {
return nil, trace.Wrap(err)
}

Expand Down Expand Up @@ -1453,7 +1464,7 @@ func initAuthUploadHandler(ctx context.Context, auditConfig types.ClusterAuditCo
}

// initAuthExternalAuditLog initializes the auth server's audit log.
func initAuthExternalAuditLog(ctx context.Context, auditConfig types.ClusterAuditConfig, backend backend.Backend, tracingProvider *tracing.Provider) (events.AuditLogger, error) {
func initAuthExternalAuditLog(ctx context.Context, auditConfig types.ClusterAuditConfig, backend backend.Backend, tracingProvider *tracing.Provider, externalCloudAudit *externalcloudaudit.Configurator) (events.AuditLogger, error) {
var hasNonFileLog bool
var loggers []events.AuditLogger
for _, eventsURI := range auditConfig.AuditEventsURIs() {
Expand Down Expand Up @@ -1526,6 +1537,11 @@ func initAuthExternalAuditLog(ctx context.Context, auditConfig types.ClusterAudi
if err != nil {
return nil, trace.Wrap(err)
}
if externalCloudAudit.IsUsed() {
if err := cfg.UpdateForExternalCloudAudit(ctx, externalCloudAudit.GetSpec(), externalCloudAudit.CredentialsProvider()); err != nil {
return nil, trace.Wrap(err)
}
}
var logger events.AuditLogger
logger, err = athena.New(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -1606,6 +1622,8 @@ func (process *TeleportProcess) initAuthService() error {
var emitter apievents.Emitter
var streamer events.Streamer
var uploadHandler events.MultipartHandler
var externalCloudAudit *externalcloudaudit.Configurator

// create the audit log, which will be consuming (and recording) all events
// and recording all sessions.
if cfg.Auth.NoAudit {
Expand All @@ -1630,8 +1648,13 @@ func (process *TeleportProcess) initAuthService() error {
cfg.Auth.AuditConfig.SetUseFIPSEndpoint(types.ClusterAuditConfigSpecV2_FIPS_ENABLED)
}

externalCloudAudit, err = process.newExternalCloudAuditConfigurator()
if err != nil {
return trace.Wrap(err)
}

uploadHandler, err = initAuthUploadHandler(
process.ExitContext(), cfg.Auth.AuditConfig, filepath.Join(cfg.DataDir, teleport.LogsDir))
process.ExitContext(), cfg.Auth.AuditConfig, filepath.Join(cfg.DataDir, teleport.LogsDir), externalCloudAudit)
if err != nil {
if !trace.IsNotFound(err) {
return trace.Wrap(err)
Expand All @@ -1643,9 +1666,10 @@ func (process *TeleportProcess) initAuthService() error {
if err != nil {
return trace.Wrap(err)
}

// initialize external loggers. may return (nil, nil) if no
// external loggers have been defined.
externalLog, err := initAuthExternalAuditLog(process.ExitContext(), cfg.Auth.AuditConfig, process.backend, process.TracingProvider)
externalLog, err := initAuthExternalAuditLog(process.ExitContext(), cfg.Auth.AuditConfig, process.backend, process.TracingProvider, externalCloudAudit)
if err != nil {
if !trace.IsNotFound(err) {
return trace.Wrap(err)
Expand Down Expand Up @@ -1814,6 +1838,10 @@ func (process *TeleportProcess) initAuthService() error {
}
authServer.SetLockWatcher(lockWatcher)

if externalCloudAudit.IsUsed() {
externalCloudAudit.SetGenerateOIDCTokenFn(authServer.GenerateExternalCloudAuditOIDCToken)
}

unifiedResourcesCache, err := services.NewUnifiedResourceCache(process.ExitContext(), services.UnifiedResourceCacheConfig{
ResourceWatcherConfig: services.ResourceWatcherConfig{
QueueSize: defaults.UnifiedResourcesQueueSize,
Expand Down Expand Up @@ -6080,3 +6108,29 @@ func makeXForwardedForMiddleware(cfg *servicecfg.Config) utils.HTTPMiddleware {
}
return utils.NoopHTTPMiddleware
}

func (process *TeleportProcess) newExternalCloudAuditConfigurator() (*externalcloudaudit.Configurator, error) {
watcher, err := local.NewClusterExternalAuditWatcher(process.GracefulExitContext(), local.ClusterExternalCloudAuditWatcherConfig{
Backend: process.backend,
OnChange: func() {
// On change of cluster external cloud audit, trigger teleport
// reload, because s3 uploader and athena components don't support
// live changes to their configuration.
process.BroadcastEvent(Event{Name: TeleportReloadEvent})
},
})
if err != nil {
return nil, trace.Wrap(err)
}
// Wait for the watcher to init to avoid a race in case the external cloud
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Wait for the watcher to init to avoid a race in case the external cloud
// Wait for the watcher init to avoid a race in case the external cloud

I think you meant watcher init only?

// audit config changes after the configurator loads it and before the
// watcher initialized.
watcher.WaitInit(process.GracefulExitContext())

ecaSvc := local.NewExternalCloudAuditService(process.backend)
integrationSvc, err := local.NewIntegrationsService(process.backend)
if err != nil {
return nil, trace.Wrap(err)
}
return externalcloudaudit.NewConfigurator(process.ExitContext(), ecaSvc, integrationSvc)
}
Loading