From becbcef19a3a0b9ed697b03d56c6d1e13bbe3584 Mon Sep 17 00:00:00 2001 From: Tobiasz Heller Date: Thu, 18 May 2023 18:53:29 +0200 Subject: [PATCH] athena audit logs - sent checksum on s3 write --- lib/events/athena/consumer.go | 8 +- lib/events/athena/integration_test.go | 115 ++++++++++++++++++++++---- 2 files changed, 102 insertions(+), 21 deletions(-) diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index 801afcb8b9aa1..bc83e21b7491b 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -26,6 +26,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" + s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go-v2/service/sqs" sqsTypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/google/uuid" @@ -133,9 +134,10 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) { queueURL: cfg.QueueURL, perDateFileParquetWriter: func(ctx context.Context, date string) (source.ParquetFile, error) { key := fmt.Sprintf("%s/%s/%s.parquet", cfg.locationS3Prefix, date, uuid.NewString()) - - // TODO(tobiaszheller): verify later acl, kms customer, object lock etc. - fw, err := s3v2.NewS3FileWriterWithClient(ctx, s3client, cfg.locationS3Bucket, key, nil) + fw, err := s3v2.NewS3FileWriterWithClient(ctx, s3client, cfg.locationS3Bucket, key, nil /* uploader options */, func(poi *s3.PutObjectInput) { + // ChecksumAlgorithm is required for putting objects when object lock is enabled. + poi.ChecksumAlgorithm = s3Types.ChecksumAlgorithmSha256 + }) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/events/athena/integration_test.go b/lib/events/athena/integration_test.go index 44cf2dd0afd72..dc4dfad8d3719 100644 --- a/lib/events/athena/integration_test.go +++ b/lib/events/athena/integration_test.go @@ -32,6 +32,8 @@ import ( athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types" "github.com/aws/aws-sdk-go-v2/service/glue" glueTypes "github.com/aws/aws-sdk-go-v2/service/glue/types" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sqs" sqsTypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" @@ -51,15 +53,17 @@ import ( ) type athenaContext struct { - log *Log - clock clockwork.Clock - testID string - database string - tablename string - s3eventsLocation string - s3resultsLocation string - s3largePayloads string - batcherInterval time.Duration + log *Log + clock clockwork.Clock + testID string + database string + bucketForEvents string + bucketForTempFiles string + tablename string + s3eventsLocation string + s3resultsLocation string + s3largePayloads string + batcherInterval time.Duration } func TestIntegrationAthenaSearchSessionEventsBySessionID(t *testing.T) { @@ -168,16 +172,19 @@ func setupAthenaContext(t *testing.T, ctx context.Context, cfg athenaContextConf t.Cleanup(func() { assert.NoError(t, backend.Close()) }) - + bucketWithLocking := "auditlogs-integrationtests-locking" + bucketForTemporaryFiles := "auditlogs-integrationtests" ac := &athenaContext{ - clock: clock, - testID: testID, - database: "auditlogs_integrationtests", - s3eventsLocation: fmt.Sprintf("%s/%s/events", "s3://auditlogs-integrationtests", testID), - s3resultsLocation: fmt.Sprintf("%s/%s/results", "s3://auditlogs-integrationtests", testID), - s3largePayloads: fmt.Sprintf("%s/%s/large_payloads", "s3://auditlogs-integrationtests", testID), - tablename: strings.ReplaceAll(testID, "-", "_"), - batcherInterval: 10 * time.Second, + clock: clock, + testID: testID, + database: "auditlogs_integrationtests", + bucketForEvents: bucketWithLocking, + bucketForTempFiles: bucketForTemporaryFiles, + s3eventsLocation: fmt.Sprintf("s3://%s/%s/events", bucketWithLocking, testID), + s3resultsLocation: fmt.Sprintf("s3://%s/%s/results", bucketForTemporaryFiles, testID), + s3largePayloads: fmt.Sprintf("s3://%s/%s/large_payloads", bucketForTemporaryFiles, testID), + tablename: strings.ReplaceAll(testID, "-", "_"), + batcherInterval: 10 * time.Second, } infraOut := ac.setupInfraWithCleanup(t, ctx) @@ -310,6 +317,78 @@ func (ac *athenaContext) setupInfraWithCleanup(t *testing.T, ctx context.Context }) require.NoError(t, err) + // Create bucket for long term storage if not exists. Bucket will have object locking which + // prevents from deleting objects, that's why it can exists before. + // Retention period will take care of cleanup of files. + s3Client := s3.NewFromConfig(awsCfg) + _, err = s3Client.HeadBucket(ctx, &s3.HeadBucketInput{ + Bucket: aws.String(ac.bucketForEvents), + }) + if err != nil { + var notFound *s3Types.NotFound + if errors.As(err, ¬Found) { + _, err = s3Client.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: aws.String(ac.bucketForEvents), + ObjectLockEnabledForBucket: true, + CreateBucketConfiguration: &s3Types.CreateBucketConfiguration{ + LocationConstraint: s3Types.BucketLocationConstraint(awsCfg.Region), + }, + }) + require.NoError(t, err) + _, err = s3Client.PutObjectLockConfiguration(ctx, &s3.PutObjectLockConfigurationInput{ + Bucket: aws.String(ac.bucketForEvents), + ObjectLockConfiguration: &s3Types.ObjectLockConfiguration{ + ObjectLockEnabled: s3Types.ObjectLockEnabledEnabled, + Rule: &s3Types.ObjectLockRule{ + DefaultRetention: &s3Types.DefaultRetention{ + Days: 1, + Mode: s3Types.ObjectLockRetentionModeGovernance, + }, + }, + }, + }) + require.NoError(t, err) + } else { + assert.Fail(t, "unexpected err", err) + } + } + + // Create bucket if not exists for temporary files (large payloads and query results). + // Retention period will take care of cleanup of files. + _, err = s3Client.HeadBucket(ctx, &s3.HeadBucketInput{ + Bucket: aws.String(ac.bucketForTempFiles), + }) + if err != nil { + var notFound *s3Types.NotFound + if errors.As(err, ¬Found) { + _, createErr := s3Client.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: aws.String(ac.bucketForTempFiles), + CreateBucketConfiguration: &s3Types.CreateBucketConfiguration{ + LocationConstraint: s3Types.BucketLocationConstraint(awsCfg.Region), + }, + }) + require.NoError(t, createErr) + } else { + assert.Fail(t, "unexpected err", err) + } + } + _, err = s3Client.PutBucketLifecycleConfiguration(ctx, &s3.PutBucketLifecycleConfigurationInput{ + Bucket: aws.String(ac.bucketForTempFiles), + LifecycleConfiguration: &s3Types.BucketLifecycleConfiguration{ + Rules: []s3Types.LifecycleRule{ + { + Status: s3Types.ExpirationStatusEnabled, + Expiration: &s3Types.LifecycleExpiration{ + Days: 1, + }, + // Prefix is required field, empty means set to whole bucket. + Prefix: aws.String(""), + }, + }, + }, + }) + require.NoError(t, err) + // Create glue db if not exists glueClient := glue.NewFromConfig(awsCfg) _, err = glueClient.GetDatabase(ctx, &glue.GetDatabaseInput{