Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions lib/events/athena/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go-v2/service/sqs"
sqsTypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/google/uuid"
Expand Down Expand Up @@ -133,9 +134,10 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) {
queueURL: cfg.QueueURL,
perDateFileParquetWriter: func(ctx context.Context, date string) (source.ParquetFile, error) {
key := fmt.Sprintf("%s/%s/%s.parquet", cfg.locationS3Prefix, date, uuid.NewString())

// TODO(tobiaszheller): verify later acl, kms customer, object lock etc.
fw, err := s3v2.NewS3FileWriterWithClient(ctx, s3client, cfg.locationS3Bucket, key, nil)
fw, err := s3v2.NewS3FileWriterWithClient(ctx, s3client, cfg.locationS3Bucket, key, nil /* uploader options */, func(poi *s3.PutObjectInput) {
// ChecksumAlgorithm is required for putting objects when object lock is enabled.
poi.ChecksumAlgorithm = s3Types.ChecksumAlgorithmSha256
})
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
115 changes: 97 additions & 18 deletions lib/events/athena/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types"
"github.com/aws/aws-sdk-go-v2/service/glue"
glueTypes "github.com/aws/aws-sdk-go-v2/service/glue/types"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sqs"
sqsTypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
Expand All @@ -51,15 +53,17 @@ import (
)

type athenaContext struct {
log *Log
clock clockwork.Clock
testID string
database string
tablename string
s3eventsLocation string
s3resultsLocation string
s3largePayloads string
batcherInterval time.Duration
log *Log
clock clockwork.Clock
testID string
database string
bucketForEvents string
bucketForTempFiles string
tablename string
s3eventsLocation string
s3resultsLocation string
s3largePayloads string
batcherInterval time.Duration
}

func TestIntegrationAthenaSearchSessionEventsBySessionID(t *testing.T) {
Expand Down Expand Up @@ -168,16 +172,19 @@ func setupAthenaContext(t *testing.T, ctx context.Context, cfg athenaContextConf
t.Cleanup(func() {
assert.NoError(t, backend.Close())
})

bucketWithLocking := "auditlogs-integrationtests-locking"
bucketForTemporaryFiles := "auditlogs-integrationtests"
ac := &athenaContext{
clock: clock,
testID: testID,
database: "auditlogs_integrationtests",
s3eventsLocation: fmt.Sprintf("%s/%s/events", "s3://auditlogs-integrationtests", testID),
s3resultsLocation: fmt.Sprintf("%s/%s/results", "s3://auditlogs-integrationtests", testID),
s3largePayloads: fmt.Sprintf("%s/%s/large_payloads", "s3://auditlogs-integrationtests", testID),
tablename: strings.ReplaceAll(testID, "-", "_"),
batcherInterval: 10 * time.Second,
clock: clock,
testID: testID,
database: "auditlogs_integrationtests",
bucketForEvents: bucketWithLocking,
bucketForTempFiles: bucketForTemporaryFiles,
s3eventsLocation: fmt.Sprintf("s3://%s/%s/events", bucketWithLocking, testID),
s3resultsLocation: fmt.Sprintf("s3://%s/%s/results", bucketForTemporaryFiles, testID),
s3largePayloads: fmt.Sprintf("s3://%s/%s/large_payloads", bucketForTemporaryFiles, testID),
tablename: strings.ReplaceAll(testID, "-", "_"),
batcherInterval: 10 * time.Second,
}
infraOut := ac.setupInfraWithCleanup(t, ctx)

Expand Down Expand Up @@ -310,6 +317,78 @@ func (ac *athenaContext) setupInfraWithCleanup(t *testing.T, ctx context.Context
})
require.NoError(t, err)

// Create bucket for long term storage if not exists. Bucket will have object locking which
// prevents from deleting objects, that's why it can exists before.
// Retention period will take care of cleanup of files.
s3Client := s3.NewFromConfig(awsCfg)
_, err = s3Client.HeadBucket(ctx, &s3.HeadBucketInput{
Bucket: aws.String(ac.bucketForEvents),
})
if err != nil {
var notFound *s3Types.NotFound
if errors.As(err, &notFound) {
_, err = s3Client.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(ac.bucketForEvents),
ObjectLockEnabledForBucket: true,
CreateBucketConfiguration: &s3Types.CreateBucketConfiguration{
LocationConstraint: s3Types.BucketLocationConstraint(awsCfg.Region),
},
})
require.NoError(t, err)
_, err = s3Client.PutObjectLockConfiguration(ctx, &s3.PutObjectLockConfigurationInput{
Bucket: aws.String(ac.bucketForEvents),
ObjectLockConfiguration: &s3Types.ObjectLockConfiguration{
ObjectLockEnabled: s3Types.ObjectLockEnabledEnabled,
Rule: &s3Types.ObjectLockRule{
DefaultRetention: &s3Types.DefaultRetention{
Days: 1,
Mode: s3Types.ObjectLockRetentionModeGovernance,
},
},
},
})
require.NoError(t, err)
} else {
assert.Fail(t, "unexpected err", err)
}
}

// Create bucket if not exists for temporary files (large payloads and query results).
// Retention period will take care of cleanup of files.
_, err = s3Client.HeadBucket(ctx, &s3.HeadBucketInput{
Bucket: aws.String(ac.bucketForTempFiles),
})
if err != nil {
var notFound *s3Types.NotFound
if errors.As(err, &notFound) {
_, createErr := s3Client.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(ac.bucketForTempFiles),
CreateBucketConfiguration: &s3Types.CreateBucketConfiguration{
LocationConstraint: s3Types.BucketLocationConstraint(awsCfg.Region),
},
})
require.NoError(t, createErr)
} else {
assert.Fail(t, "unexpected err", err)
}
}
_, err = s3Client.PutBucketLifecycleConfiguration(ctx, &s3.PutBucketLifecycleConfigurationInput{
Bucket: aws.String(ac.bucketForTempFiles),
LifecycleConfiguration: &s3Types.BucketLifecycleConfiguration{
Rules: []s3Types.LifecycleRule{
{
Status: s3Types.ExpirationStatusEnabled,
Expiration: &s3Types.LifecycleExpiration{
Days: 1,
},
// Prefix is required field, empty means set to whole bucket.
Prefix: aws.String(""),
},
},
},
})
require.NoError(t, err)

// Create glue db if not exists
glueClient := glue.NewFromConfig(awsCfg)
_, err = glueClient.GetDatabase(ctx, &glue.GetDatabaseInput{
Expand Down