diff --git a/lib/events/athena/querier_test.go b/lib/events/athena/querier_test.go index 4a82a412408b8..0a52a66cef7d2 100644 --- a/lib/events/athena/querier_test.go +++ b/lib/events/athena/querier_test.go @@ -551,7 +551,7 @@ func TestSearchEvents(t *testing.T) { Limit: 100, Order: types.EventOrderAscending, // startKey generated by dynamo which points to Apr 27 2023 08:22:58 UTC - StartKey: `{"date":"2023-04-27","iterator":{"CreatedAt":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":"1682583778","NS":null,"NULL":null,"S":null,"SS":null},"CreatedAtDate":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":null,"NS":null,"NULL":null,"S":"2023-04-27","SS":null},"EventIndex":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":"0","NS":null,"NULL":null,"S":null,"SS":null},"SessionID":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":null,"NS":null,"NULL":null,"S":"4bc51fd7-4f0c-47ee-b9a5-da621fbdbabb","SS":null}}}`, + StartKey: `{"date":"2023-04-27","iterator":"{\"CreatedAt\":1682583778,\"CreatedAtDate\":\"2023-04-27\",\"EventIndex\":0,\"SessionID\":\"4bc51fd7-4f0c-47ee-b9a5-da621fbdbabb\"}"}`, }, queryResultsResps: singleCallResults(100), check: func(t *testing.T, mock *mockAthenaExecutor, paginationKey string) { @@ -572,12 +572,11 @@ func TestSearchEvents(t *testing.T) { searchParams: &events.SearchEventsRequest{ // To is set here as value from keyset -5h to test case // when cost optimized search should not be used. - From: dynamoKeysetTimestamp.Add(-5 * time.Hour), - To: toUTC, - Limit: 100, - Order: types.EventOrderDescending, - // startKey generated by dynamo which points to Apr 27 2023 08:22:58 UTC - StartKey: `{"date":"2023-04-27","iterator":{"CreatedAt":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":"1682583778","NS":null,"NULL":null,"S":null,"SS":null},"CreatedAtDate":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":null,"NS":null,"NULL":null,"S":"2023-04-27","SS":null},"EventIndex":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":"0","NS":null,"NULL":null,"S":null,"SS":null},"SessionID":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":null,"NS":null,"NULL":null,"S":"4bc51fd7-4f0c-47ee-b9a5-da621fbdbabb","SS":null}}}`, + From: dynamoKeysetTimestamp.Add(-5 * time.Hour), + To: toUTC, + Limit: 100, + Order: types.EventOrderDescending, + StartKey: `{"date":"2023-04-27","iterator":"{\"CreatedAt\":1682583778,\"CreatedAtDate\":\"2023-04-27\",\"EventIndex\":0,\"SessionID\":\"4bc51fd7-4f0c-47ee-b9a5-da621fbdbabb\"}"}`, }, queryResultsResps: singleCallResults(100), check: func(t *testing.T, mock *mockAthenaExecutor, paginationKey string) { diff --git a/lib/events/dynamoevents/dynamoevents.go b/lib/events/dynamoevents/dynamoevents.go index 8482e432f88b0..caa70e773684b 100644 --- a/lib/events/dynamoevents/dynamoevents.go +++ b/lib/events/dynamoevents/dynamoevents.go @@ -27,20 +27,22 @@ import ( "fmt" "maps" "math" + "net/http" "net/url" "sort" "strconv" "strings" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/request" - awssession "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/applicationautoscaling" - "github.com/aws/aws-sdk-go/service/dynamodb" - "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" - "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/applicationautoscaling" + autoscalingtypes "github.com/aws/aws-sdk-go-v2/service/applicationautoscaling/types" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + dynamodbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/aws/smithy-go" + smithyendpoints "github.com/aws/smithy-go/endpoints" "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" @@ -50,8 +52,9 @@ import ( apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/events" - dynamometrics "github.com/gravitational/teleport/lib/observability/metrics/dynamo" + "github.com/gravitational/teleport/lib/modules" "github.com/gravitational/teleport/lib/utils" ) @@ -71,24 +74,24 @@ const ( ) // Defines the attribute schema for the DynamoDB event table and index. -var tableSchema = []*dynamodb.AttributeDefinition{ +var tableSchema = []dynamodbtypes.AttributeDefinition{ // Existing attributes pre RFD 24. { AttributeName: aws.String(keySessionID), - AttributeType: aws.String("S"), + AttributeType: dynamodbtypes.ScalarAttributeTypeS, }, { AttributeName: aws.String(keyEventIndex), - AttributeType: aws.String("N"), + AttributeType: dynamodbtypes.ScalarAttributeTypeN, }, { AttributeName: aws.String(keyCreatedAt), - AttributeType: aws.String("N"), + AttributeType: dynamodbtypes.ScalarAttributeTypeN, }, // New attribute in RFD 24. { AttributeName: aws.String(keyDate), - AttributeType: aws.String("S"), + AttributeType: dynamodbtypes.ScalarAttributeTypeS, }, } @@ -116,15 +119,15 @@ type Config struct { DisableConflictCheck bool // ReadMaxCapacity is the maximum provisioned read capacity. - ReadMaxCapacity int64 + ReadMaxCapacity int32 // ReadMinCapacity is the minimum provisioned read capacity. - ReadMinCapacity int64 + ReadMinCapacity int32 // ReadTargetValue is the ratio of consumed read to provisioned capacity. ReadTargetValue float64 // WriteMaxCapacity is the maximum provisioned write capacity. - WriteMaxCapacity int64 + WriteMaxCapacity int32 // WriteMinCapacity is the minimum provisioned write capacity. - WriteMinCapacity int64 + WriteMinCapacity int32 // WriteTargetValue is the ratio of consumed write to provisioned capacity. WriteTargetValue float64 @@ -203,10 +206,7 @@ type Log struct { *log.Entry // Config is a backend configuration Config - svc dynamodbiface.DynamoDBAPI - - // session holds the AWS client. - session *awssession.Session + svc *dynamodb.Client } type event struct { @@ -214,7 +214,7 @@ type event struct { EventIndex int64 EventType string CreatedAt int64 - Expires *int64 `json:"Expires,omitempty"` + Expires *int64 `json:"Expires,omitempty" dynamodbav:",omitempty"` FieldsMap events.EventFields EventNamespace string CreatedAtDate string @@ -265,51 +265,66 @@ func New(ctx context.Context, cfg Config) (*Log, error) { if err != nil { return nil, trace.Wrap(err) } - b := &Log{ - Entry: l, - Config: cfg, - } - awsConfig := aws.Config{} + opts := []func(*config.LoadOptions) error{ + config.WithRegion(cfg.Region), + config.WithHTTPClient(&http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + MaxIdleConns: defaults.HTTPMaxIdleConns, + MaxIdleConnsPerHost: defaults.HTTPMaxIdleConnsPerHost, + }, + }), + } - // Override the default environment's region if value set in YAML file: - if cfg.Region != "" { - awsConfig.Region = aws.String(cfg.Region) + awsConfig, err := config.LoadDefaultConfig(ctx, opts...) + if err != nil { + return nil, trace.Wrap(err) } + var dynamoOpts []func(*dynamodb.Options) + // Override the service endpoint using the "endpoint" query parameter from // "audit_events_uri". This is for non-AWS DynamoDB-compatible backends. if cfg.Endpoint != "" { - awsConfig.Endpoint = aws.String(cfg.Endpoint) + u, err := url.Parse(cfg.Endpoint) + if err != nil { + return nil, trace.BadParameter("configured DynamoDB events endpoint is invalid: %s", err.Error()) + } + + dynamoOpts = append(dynamoOpts, dynamodb.WithEndpointResolverV2(&staticResolver{endpoint: u})) } - b.session, err = awssession.NewSessionWithOptions(awssession.Options{ - SharedConfigState: awssession.SharedConfigEnable, - Config: awsConfig, - }) - if err != nil { - return nil, trace.Wrap(err) + // FIPS settings are applied on the individual service instead of the aws config, + // as DynamoDB Streams and Application Auto Scaling do not yet have FIPS endpoints in non-GovCloud. + // See also: https://aws.amazon.com/compliance/fips/#FIPS_Endpoints_by_Service + if modules.GetModules().IsBoringBinary() && cfg.UseFIPSEndpoint == types.ClusterAuditConfigSpecV2_FIPS_ENABLED { + dynamoOpts = append(dynamoOpts, func(o *dynamodb.Options) { + o.EndpointOptions.UseFIPSEndpoint = aws.FIPSEndpointStateEnabled + }) } - // Create DynamoDB service. - svc, err := dynamometrics.NewAPIMetrics(dynamometrics.Events, dynamodb.New(b.session, &aws.Config{ - // Setting this on the individual service instead of the session, as DynamoDB Streams - // and Application Auto Scaling do not yet have FIPS endpoints in non-GovCloud. - // See also: https://aws.amazon.com/compliance/fips/#FIPS_Endpoints_by_Service - UseFIPSEndpoint: events.FIPSProtoStateToAWSState(cfg.UseFIPSEndpoint), - })) - if err != nil { - return nil, trace.Wrap(err) + b := &Log{ + Entry: l, + Config: cfg, + svc: dynamodb.NewFromConfig(awsConfig, dynamoOpts...), } - b.svc = svc - if err := b.configureTable(ctx); err != nil { + if err := b.configureTable(ctx, applicationautoscaling.NewFromConfig(awsConfig)); err != nil { return nil, trace.Wrap(err) } return b, nil } +type staticResolver struct { + endpoint *url.URL +} + +func (s *staticResolver) ResolveEndpoint(ctx context.Context, params dynamodb.EndpointParameters) (smithyendpoints.Endpoint, error) { + return smithyendpoints.Endpoint{URI: *s.endpoint}, nil +} + type tableStatus int const ( @@ -319,7 +334,7 @@ const ( tableStatusOK ) -func (l *Log) configureTable(ctx context.Context) error { +func (l *Log) configureTable(ctx context.Context, svc *applicationautoscaling.Client) error { // check if the table exists? ts, err := l.getTableStatus(ctx, l.Tablename) if err != nil { @@ -337,18 +352,18 @@ func (l *Log) configureTable(ctx context.Context) error { return trace.Wrap(err) } tableName := aws.String(l.Tablename) - ttlStatus, err := l.svc.DescribeTimeToLiveWithContext(ctx, &dynamodb.DescribeTimeToLiveInput{ + ttlStatus, err := l.svc.DescribeTimeToLive(ctx, &dynamodb.DescribeTimeToLiveInput{ TableName: tableName, }) if err != nil { return trace.Wrap(convertError(err)) } - switch aws.StringValue(ttlStatus.TimeToLiveDescription.TimeToLiveStatus) { - case dynamodb.TimeToLiveStatusEnabled, dynamodb.TimeToLiveStatusEnabling: + switch ttlStatus.TimeToLiveDescription.TimeToLiveStatus { + case dynamodbtypes.TimeToLiveStatusEnabled, dynamodbtypes.TimeToLiveStatusEnabling: default: - _, err = l.svc.UpdateTimeToLiveWithContext(ctx, &dynamodb.UpdateTimeToLiveInput{ + _, err = l.svc.UpdateTimeToLive(ctx, &dynamodb.UpdateTimeToLiveInput{ TableName: tableName, - TimeToLiveSpecification: &dynamodb.TimeToLiveSpecification{ + TimeToLiveSpecification: &dynamodbtypes.TimeToLiveSpecification{ AttributeName: aws.String(keyExpires), Enabled: aws.Bool(true), }, @@ -361,8 +376,8 @@ func (l *Log) configureTable(ctx context.Context) error { // Enable continuous backups if requested. if l.Config.EnableContinuousBackups { // Make request to AWS to update continuous backups settings. - _, err := l.svc.UpdateContinuousBackupsWithContext(ctx, &dynamodb.UpdateContinuousBackupsInput{ - PointInTimeRecoverySpecification: &dynamodb.PointInTimeRecoverySpecification{ + _, err := l.svc.UpdateContinuousBackups(ctx, &dynamodb.UpdateContinuousBackupsInput{ + PointInTimeRecoverySpecification: &dynamodbtypes.PointInTimeRecoverySpecification{ PointInTimeRecoveryEnabled: aws.Bool(true), }, TableName: tableName, @@ -374,11 +389,9 @@ func (l *Log) configureTable(ctx context.Context) error { // Enable auto scaling if requested. if l.Config.EnableAutoScaling { - svc := applicationautoscaling.New(l.session) - type autoscalingParams struct { - readDimension string - writeDimension string + readDimension autoscalingtypes.ScalableDimension + writeDimension autoscalingtypes.ScalableDimension resourceID string readPolicy string writePolicy string @@ -386,15 +399,15 @@ func (l *Log) configureTable(ctx context.Context) error { params := []autoscalingParams{ { - readDimension: applicationautoscaling.ScalableDimensionDynamodbTableReadCapacityUnits, - writeDimension: applicationautoscaling.ScalableDimensionDynamodbTableWriteCapacityUnits, + readDimension: autoscalingtypes.ScalableDimensionDynamoDBTableReadCapacityUnits, + writeDimension: autoscalingtypes.ScalableDimensionDynamoDBTableWriteCapacityUnits, resourceID: fmt.Sprintf("table/%s", l.Tablename), readPolicy: fmt.Sprintf("%s-write-target-tracking-scaling-policy", l.Tablename), writePolicy: fmt.Sprintf("%s-write-target-tracking-scaling-policy", l.Tablename), }, { - readDimension: applicationautoscaling.ScalableDimensionDynamodbIndexReadCapacityUnits, - writeDimension: applicationautoscaling.ScalableDimensionDynamodbIndexWriteCapacityUnits, + readDimension: autoscalingtypes.ScalableDimensionDynamoDBIndexReadCapacityUnits, + writeDimension: autoscalingtypes.ScalableDimensionDynamoDBIndexWriteCapacityUnits, resourceID: fmt.Sprintf("table/%s/index/%s", l.Tablename, indexTimeSearchV2), readPolicy: fmt.Sprintf("%s/index/%s-write-target-tracking-scaling-policy", l.Tablename, indexTimeSearchV2), writePolicy: fmt.Sprintf("%s/index/%s-write-target-tracking-scaling-policy", l.Tablename, indexTimeSearchV2), @@ -403,36 +416,36 @@ func (l *Log) configureTable(ctx context.Context) error { for _, p := range params { // Define scaling targets. Defines minimum and maximum {read,write} capacity. - if _, err := svc.RegisterScalableTargetWithContext(ctx, &applicationautoscaling.RegisterScalableTargetInput{ - MinCapacity: aws.Int64(l.ReadMinCapacity), - MaxCapacity: aws.Int64(l.ReadMaxCapacity), + if _, err := svc.RegisterScalableTarget(ctx, &applicationautoscaling.RegisterScalableTargetInput{ + MinCapacity: aws.Int32(l.ReadMinCapacity), + MaxCapacity: aws.Int32(l.ReadMaxCapacity), ResourceId: aws.String(p.resourceID), - ScalableDimension: aws.String(p.readDimension), - ServiceNamespace: aws.String(applicationautoscaling.ServiceNamespaceDynamodb), + ScalableDimension: p.readDimension, + ServiceNamespace: autoscalingtypes.ServiceNamespaceDynamodb, }); err != nil { return trace.Wrap(convertError(err)) } - if _, err := svc.RegisterScalableTargetWithContext(ctx, &applicationautoscaling.RegisterScalableTargetInput{ - MinCapacity: aws.Int64(l.WriteMinCapacity), - MaxCapacity: aws.Int64(l.WriteMaxCapacity), + if _, err := svc.RegisterScalableTarget(ctx, &applicationautoscaling.RegisterScalableTargetInput{ + MinCapacity: aws.Int32(l.WriteMinCapacity), + MaxCapacity: aws.Int32(l.WriteMaxCapacity), ResourceId: aws.String(p.resourceID), - ScalableDimension: aws.String(p.writeDimension), - ServiceNamespace: aws.String(applicationautoscaling.ServiceNamespaceDynamodb), + ScalableDimension: p.writeDimension, + ServiceNamespace: autoscalingtypes.ServiceNamespaceDynamodb, }); err != nil { return trace.Wrap(convertError(err)) } // Define scaling policy. Defines the ratio of {read,write} consumed capacity to // provisioned capacity DynamoDB will try and maintain. - if _, err := svc.PutScalingPolicyWithContext(ctx, &applicationautoscaling.PutScalingPolicyInput{ + if _, err := svc.PutScalingPolicy(ctx, &applicationautoscaling.PutScalingPolicyInput{ PolicyName: aws.String(p.readPolicy), - PolicyType: aws.String(applicationautoscaling.PolicyTypeTargetTrackingScaling), + PolicyType: autoscalingtypes.PolicyTypeTargetTrackingScaling, ResourceId: aws.String(p.resourceID), - ScalableDimension: aws.String(p.readDimension), - ServiceNamespace: aws.String(applicationautoscaling.ServiceNamespaceDynamodb), - TargetTrackingScalingPolicyConfiguration: &applicationautoscaling.TargetTrackingScalingPolicyConfiguration{ - PredefinedMetricSpecification: &applicationautoscaling.PredefinedMetricSpecification{ - PredefinedMetricType: aws.String(applicationautoscaling.MetricTypeDynamoDbreadCapacityUtilization), + ScalableDimension: p.readDimension, + ServiceNamespace: autoscalingtypes.ServiceNamespaceDynamodb, + TargetTrackingScalingPolicyConfiguration: &autoscalingtypes.TargetTrackingScalingPolicyConfiguration{ + PredefinedMetricSpecification: &autoscalingtypes.PredefinedMetricSpecification{ + PredefinedMetricType: autoscalingtypes.MetricTypeDynamoDBReadCapacityUtilization, }, TargetValue: aws.Float64(l.ReadTargetValue), }, @@ -440,15 +453,15 @@ func (l *Log) configureTable(ctx context.Context) error { return trace.Wrap(convertError(err)) } - if _, err := svc.PutScalingPolicyWithContext(ctx, &applicationautoscaling.PutScalingPolicyInput{ + if _, err := svc.PutScalingPolicy(ctx, &applicationautoscaling.PutScalingPolicyInput{ PolicyName: aws.String(p.writePolicy), - PolicyType: aws.String(applicationautoscaling.PolicyTypeTargetTrackingScaling), + PolicyType: autoscalingtypes.PolicyTypeTargetTrackingScaling, ResourceId: aws.String(p.resourceID), - ScalableDimension: aws.String(p.writeDimension), - ServiceNamespace: aws.String(applicationautoscaling.ServiceNamespaceDynamodb), - TargetTrackingScalingPolicyConfiguration: &applicationautoscaling.TargetTrackingScalingPolicyConfiguration{ - PredefinedMetricSpecification: &applicationautoscaling.PredefinedMetricSpecification{ - PredefinedMetricType: aws.String(applicationautoscaling.MetricTypeDynamoDbwriteCapacityUtilization), + ScalableDimension: p.writeDimension, + ServiceNamespace: autoscalingtypes.ServiceNamespaceDynamodb, + TargetTrackingScalingPolicyConfiguration: &autoscalingtypes.TargetTrackingScalingPolicyConfiguration{ + PredefinedMetricSpecification: &autoscalingtypes.PredefinedMetricSpecification{ + PredefinedMetricType: autoscalingtypes.MetricTypeDynamoDBWriteCapacityUtilization, }, TargetValue: aws.Float64(l.WriteTargetValue), }, @@ -543,7 +556,7 @@ func (l *Log) putAuditEvent(ctx context.Context, sessionID string, in apievents. return trace.Wrap(err) } - if _, err = l.svc.PutItemWithContext(ctx, input); err != nil { + if _, err = l.svc.PutItem(ctx, input); err != nil { err = convertError(err) switch { @@ -584,7 +597,7 @@ func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamod CreatedAtDate: in.GetTime().Format(iso8601DateFormat), } l.setExpiry(&e) - av, err := dynamodbattribute.MarshalMap(e) + av, err := attributevalue.MarshalMap(e) if err != nil { return nil, trace.Wrap(err) } @@ -638,7 +651,7 @@ type checkpointKey struct { Date string `json:"date,omitempty"` // A DynamoDB query iterator. Allows us to resume a partial query. - Iterator map[string]*dynamodb.AttributeValue `json:"iterator,omitempty"` + Iterator string `json:"iterator,omitempty"` // EventKey is a derived identifier for an event used for resuming // sub-page breaks due to size constraints. @@ -768,11 +781,11 @@ func (l *Log) searchEventsRaw(ctx context.Context, fromUTC, toUTC time.Time, nam } indexName := aws.String(indexTimeSearchV2) - var left int64 + var left int32 if limit != 0 { - left = int64(limit) + left = int32(limit) } else { - left = math.MaxInt64 + left = math.MaxInt32 } // Resume scanning at the correct date. We need to do this because we send individual queries per date @@ -861,11 +874,11 @@ func GetCreatedAtFromStartKey(startKey string) (time.Time, error) { if err != nil { return time.Time{}, trace.Wrap(err) } - if checkpoint.Iterator == nil { + if checkpoint.Iterator == "" { return time.Time{}, errors.New("missing iterator") } var e event - if err := dynamodbattribute.UnmarshalMap(checkpoint.Iterator, &e); err != nil { + if err := json.Unmarshal([]byte(checkpoint.Iterator), &e); err != nil { return time.Time{}, trace.Wrap(err) } if e.CreatedAt <= 0 { @@ -884,7 +897,7 @@ func getCheckpointFromStartKey(startKey string) (checkpointKey, error) { } // If a checkpoint key is provided, unmarshal it so we can work with it's parts. if err := json.Unmarshal([]byte(startKey), &checkpoint); err != nil { - return checkpoint, trace.Wrap(err) + return checkpointKey{}, trace.Wrap(err) } return checkpoint, nil } @@ -1015,7 +1028,7 @@ func fromWhereExpr(cond *types.WhereExpr, params *condFilterParams) (string, err // getTableStatus checks if a given table exists func (l *Log) getTableStatus(ctx context.Context, tableName string) (tableStatus, error) { - _, err := l.svc.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{ + _, err := l.svc.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) err = convertError(err) @@ -1030,7 +1043,7 @@ func (l *Log) getTableStatus(ctx context.Context, tableName string) (tableStatus // indexExists checks if a given index exists on a given table and that it is active or updating. func (l *Log) indexExists(ctx context.Context, tableName, indexName string) (bool, error) { - tableDescription, err := l.svc.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{ + tableDescription, err := l.svc.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) if err != nil { @@ -1038,7 +1051,7 @@ func (l *Log) indexExists(ctx context.Context, tableName, indexName string) (boo } for _, gsi := range tableDescription.Table.GlobalSecondaryIndexes { - if *gsi.IndexName == indexName && (*gsi.IndexStatus == dynamodb.IndexStatusActive || *gsi.IndexStatus == dynamodb.IndexStatusUpdating) { + if *gsi.IndexName == indexName && (gsi.IndexStatus == dynamodbtypes.IndexStatusActive || gsi.IndexStatus == dynamodbtypes.IndexStatusUpdating) { return true, nil } } @@ -1052,18 +1065,18 @@ func (l *Log) indexExists(ctx context.Context, tableName, indexName string) (boo // currently is always set to "FullPath" (used to be something else, that's // why it's a parameter for migration purposes) func (l *Log) createTable(ctx context.Context, tableName string) error { - provisionedThroughput := dynamodb.ProvisionedThroughput{ + provisionedThroughput := dynamodbtypes.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(l.ReadCapacityUnits), WriteCapacityUnits: aws.Int64(l.WriteCapacityUnits), } - elems := []*dynamodb.KeySchemaElement{ + elems := []dynamodbtypes.KeySchemaElement{ { AttributeName: aws.String(keySessionID), - KeyType: aws.String("HASH"), + KeyType: dynamodbtypes.KeyTypeHash, }, { AttributeName: aws.String(keyEventIndex), - KeyType: aws.String("RANGE"), + KeyType: dynamodbtypes.KeyTypeRange, }, } c := dynamodb.CreateTableInput{ @@ -1071,38 +1084,41 @@ func (l *Log) createTable(ctx context.Context, tableName string) error { AttributeDefinitions: tableSchema, KeySchema: elems, ProvisionedThroughput: &provisionedThroughput, - GlobalSecondaryIndexes: []*dynamodb.GlobalSecondaryIndex{ + GlobalSecondaryIndexes: []dynamodbtypes.GlobalSecondaryIndex{ { IndexName: aws.String(indexTimeSearchV2), - KeySchema: []*dynamodb.KeySchemaElement{ + KeySchema: []dynamodbtypes.KeySchemaElement{ { // Partition by date instead of namespace. AttributeName: aws.String(keyDate), - KeyType: aws.String("HASH"), + KeyType: dynamodbtypes.KeyTypeHash, }, { AttributeName: aws.String(keyCreatedAt), - KeyType: aws.String("RANGE"), + KeyType: dynamodbtypes.KeyTypeRange, }, }, - Projection: &dynamodb.Projection{ - ProjectionType: aws.String("ALL"), + Projection: &dynamodbtypes.Projection{ + ProjectionType: dynamodbtypes.ProjectionTypeAll, }, ProvisionedThroughput: &provisionedThroughput, }, }, } - _, err := l.svc.CreateTableWithContext(ctx, &c) + _, err := l.svc.CreateTable(ctx, &c) if err != nil { return trace.Wrap(err) } log.Infof("Waiting until table %q is created", tableName) - err = l.svc.WaitUntilTableExistsWithContext(ctx, &dynamodb.DescribeTableInput{ - TableName: aws.String(tableName), - }) + waiter := dynamodb.NewTableExistsWaiter(l.svc) + err = waiter.Wait(ctx, + &dynamodb.DescribeTableInput{TableName: aws.String(tableName)}, + 10*time.Minute, + ) if err == nil { log.Infof("Table %q has been created", tableName) } + return trace.Wrap(err) } @@ -1113,15 +1129,15 @@ func (l *Log) Close() error { // deleteAllItems deletes all items from the database, used in tests func (l *Log) deleteAllItems(ctx context.Context) error { - out, err := l.svc.ScanWithContext(ctx, &dynamodb.ScanInput{TableName: aws.String(l.Tablename)}) + out, err := l.svc.Scan(ctx, &dynamodb.ScanInput{TableName: aws.String(l.Tablename)}) if err != nil { return trace.Wrap(err) } - var requests []*dynamodb.WriteRequest + var requests []dynamodbtypes.WriteRequest for _, item := range out.Items { - requests = append(requests, &dynamodb.WriteRequest{ - DeleteRequest: &dynamodb.DeleteRequest{ - Key: map[string]*dynamodb.AttributeValue{ + requests = append(requests, dynamodbtypes.WriteRequest{ + DeleteRequest: &dynamodbtypes.DeleteRequest{ + Key: map[string]dynamodbtypes.AttributeValue{ keySessionID: item[keySessionID], keyEventIndex: item[keyEventIndex], }, @@ -1137,8 +1153,8 @@ func (l *Log) deleteAllItems(ctx context.Context) error { chunk := requests[:top] requests = requests[top:] - _, err := l.svc.BatchWriteItemWithContext(ctx, &dynamodb.BatchWriteItemInput{ - RequestItems: map[string][]*dynamodb.WriteRequest{ + _, err := l.svc.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]dynamodbtypes.WriteRequest{ l.Tablename: chunk, }, }) @@ -1154,15 +1170,20 @@ func (l *Log) deleteAllItems(ctx context.Context) error { // deleteTable deletes DynamoDB table with a given name func (l *Log) deleteTable(ctx context.Context, tableName string, wait bool) error { tn := aws.String(tableName) - _, err := l.svc.DeleteTableWithContext(ctx, &dynamodb.DeleteTableInput{TableName: tn}) + _, err := l.svc.DeleteTable(ctx, &dynamodb.DeleteTableInput{TableName: tn}) if err != nil { return trace.Wrap(err) } - if wait { - return trace.Wrap( - l.svc.WaitUntilTableNotExistsWithContext(ctx, &dynamodb.DescribeTableInput{TableName: tn})) + if !wait { + return nil } - return nil + + waiter := dynamodb.NewTableNotExistsWaiter(l.svc) + + return trace.Wrap(waiter.Wait(ctx, + &dynamodb.DescribeTableInput{TableName: tn}, + 10*time.Minute, + )) } var errAWSValidation = errors.New("aws validation error") @@ -1171,34 +1192,47 @@ func convertError(err error) error { if err == nil { return nil } - var aerr awserr.Error - if !errors.As(err, &aerr) { - return err + + var conditionalCheckFailedError *dynamodbtypes.ConditionalCheckFailedException + if errors.As(err, &conditionalCheckFailedError) { + return trace.AlreadyExists(conditionalCheckFailedError.ErrorMessage()) } - switch aerr.Code() { - case dynamodb.ErrCodeConditionalCheckFailedException: - return trace.AlreadyExists(aerr.Error()) - case dynamodb.ErrCodeProvisionedThroughputExceededException: - return trace.ConnectionProblem(aerr, aerr.Error()) - case dynamodb.ErrCodeResourceNotFoundException: - return trace.NotFound(aerr.Error()) - case dynamodb.ErrCodeItemCollectionSizeLimitExceededException: - return trace.BadParameter(aerr.Error()) - case dynamodb.ErrCodeInternalServerError: - return trace.BadParameter(aerr.Error()) - case ErrValidationException: - // A ValidationException type is missing from AWS SDK. - // Use errAWSValidation that for most cases will contain: - // "Item size has exceeded the maximum allowed size" AWS validation error. - return trace.Wrap(errAWSValidation, aerr.Error()) - default: - return err + var throughputExceededError *dynamodbtypes.ProvisionedThroughputExceededException + if errors.As(err, &throughputExceededError) { + return trace.ConnectionProblem(throughputExceededError, throughputExceededError.ErrorMessage()) + } + + var notFoundError *dynamodbtypes.ResourceNotFoundException + if errors.As(err, ¬FoundError) { + return trace.NotFound(notFoundError.ErrorMessage()) + } + + var collectionLimitExceededError *dynamodbtypes.ItemCollectionSizeLimitExceededException + if errors.As(err, ¬FoundError) { + return trace.BadParameter(collectionLimitExceededError.ErrorMessage()) + } + + var internalError *dynamodbtypes.InternalServerError + if errors.As(err, &internalError) { + return trace.BadParameter(internalError.ErrorMessage()) + } + + var ae smithy.APIError + if errors.As(err, &ae) { + if ae.ErrorCode() == ErrValidationException { + // A ValidationException type is missing from AWS SDK. + // Use errAWSValidation that for most cases will contain: + // "Item size has exceeded the maximum allowed size" AWS validation error. + return trace.Wrap(errAWSValidation, ae.Error()) + } } + + return err } type query interface { - QueryWithContext(ctx context.Context, input *dynamodb.QueryInput, opts ...request.Option) (*dynamodb.QueryOutput, error) + Query(ctx context.Context, params *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) } type eventsFetcher struct { @@ -1210,7 +1244,7 @@ type eventsFetcher struct { checkpoint *checkpointKey foundStart bool dates []string - left int64 + left int32 fromUTC time.Time toUTC time.Time @@ -1221,13 +1255,26 @@ type eventsFetcher struct { } func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeftFun func() bool) ([]event, bool, error) { - var out []event oldIterator := l.checkpoint.Iterator - l.checkpoint.Iterator = output.LastEvaluatedKey + l.checkpoint.Iterator = "" + + if output.LastEvaluatedKey != nil { + m := make(map[string]any) + if err := attributevalue.UnmarshalMap(output.LastEvaluatedKey, &m); err != nil { + return nil, false, trace.Wrap(err) + } + + iter, err := json.Marshal(&m) + if err != nil { + return nil, false, err + } + l.checkpoint.Iterator = string(iter) + } + var out []event for _, item := range output.Items { var e event - if err := dynamodbattribute.UnmarshalMap(item, &e); err != nil { + if err := attributevalue.UnmarshalMap(item, &e); err != nil { return nil, false, trace.WrapWithMessage(err, "failed to unmarshal event") } data, err := json.Marshal(e.FieldsMap) @@ -1272,7 +1319,7 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft if hasLeftFun != nil { hf = hasLeftFun() } - l.hasLeft = hf || len(l.checkpoint.Iterator) != 0 + l.hasLeft = hf || l.checkpoint.Iterator != "" l.checkpoint.EventKey = "" return out, true, nil } @@ -1282,10 +1329,6 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft func (l *eventsFetcher) QueryByDateIndex(ctx context.Context, filterExpr *string) (values []event, err error) { query := "CreatedAtDate = :date AND CreatedAt BETWEEN :start and :end" - var attributeNames map[string]*string - if len(l.filter.condParams.attrNames) > 0 { - attributeNames = aws.StringMap(l.filter.condParams.attrNames) - } dateLoop: for i, date := range l.dates { @@ -1300,7 +1343,7 @@ dateLoop: attributes[fmt.Sprintf(":eventType%d", i)] = eventType } maps.Copy(attributes, l.filter.condParams.attrValues) - attributeValues, err := dynamodbattribute.MarshalMap(attributes) + attributeValues, err := attributevalue.MarshalMap(attributes) if err != nil { return nil, trace.Wrap(err) } @@ -1308,16 +1351,29 @@ dateLoop: input := dynamodb.QueryInput{ KeyConditionExpression: aws.String(query), TableName: aws.String(l.tableName), - ExpressionAttributeNames: attributeNames, + ExpressionAttributeNames: l.filter.condParams.attrNames, ExpressionAttributeValues: attributeValues, IndexName: aws.String(indexTimeSearchV2), - ExclusiveStartKey: l.checkpoint.Iterator, - Limit: aws.Int64(l.left), + Limit: aws.Int32(l.left), FilterExpression: filterExpr, ScanIndexForward: aws.Bool(l.forward), } + + if l.checkpoint.Iterator != "" { + m := make(map[string]any) + err = json.Unmarshal([]byte(l.checkpoint.Iterator), &m) + if err != nil { + return nil, trace.Wrap(err) + } + + input.ExclusiveStartKey, err = attributevalue.MarshalMap(&m) + if err != nil { + return nil, trace.Wrap(err) + } + } + start := time.Now() - out, err := l.api.QueryWithContext(ctx, &input) + out, err := l.api.Query(ctx, &input) if err != nil { return nil, trace.Wrap(err) } @@ -1346,12 +1402,12 @@ dateLoop: // from the same date and the request's iterator to fetch the remainder of the page. // If the input iterator is empty but the EventKey is not, we need to resume the query from the same date // and we shouldn't move to the next date. - if i < len(l.dates)-1 && len(l.checkpoint.Iterator) == 0 && l.checkpoint.EventKey == "" { + if i < len(l.dates)-1 && l.checkpoint.Iterator == "" && l.checkpoint.EventKey == "" { l.checkpoint.Date = l.dates[i+1] } return values, nil } - if len(l.checkpoint.Iterator) == 0 { + if l.checkpoint.Iterator == "" { continue dateLoop } } @@ -1361,10 +1417,6 @@ dateLoop: func (l *eventsFetcher) QueryBySessionIDIndex(ctx context.Context, sessionID string, filterExpr *string) (values []event, err error) { query := "SessionID = :id" - var attributeNames map[string]*string - if len(l.filter.condParams.attrNames) > 0 { - attributeNames = aws.StringMap(l.filter.condParams.attrNames) - } attributes := map[string]interface{}{ ":id": sessionID, @@ -1374,23 +1426,35 @@ func (l *eventsFetcher) QueryBySessionIDIndex(ctx context.Context, sessionID str } maps.Copy(attributes, l.filter.condParams.attrValues) - attributeValues, err := dynamodbattribute.MarshalMap(attributes) + attributeValues, err := attributevalue.MarshalMap(attributes) if err != nil { return nil, trace.Wrap(err) } input := dynamodb.QueryInput{ KeyConditionExpression: aws.String(query), TableName: aws.String(l.tableName), - ExpressionAttributeNames: attributeNames, + ExpressionAttributeNames: l.filter.condParams.attrNames, ExpressionAttributeValues: attributeValues, IndexName: nil, // Use primary SessionID index. - ExclusiveStartKey: l.checkpoint.Iterator, - Limit: aws.Int64(l.left), + Limit: aws.Int32(l.left), FilterExpression: filterExpr, ScanIndexForward: aws.Bool(l.forward), } + + if l.checkpoint.Iterator != "" { + m := make(map[string]string) + if err = json.Unmarshal([]byte(l.checkpoint.Iterator), &m); err != nil { + return nil, trace.Wrap(err) + } + + input.ExclusiveStartKey, err = attributevalue.MarshalMap(&m) + if err != nil { + return nil, trace.Wrap(err) + } + } + start := time.Now() - out, err := l.api.QueryWithContext(ctx, &input) + out, err := l.api.Query(ctx, &input) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/events/dynamoevents/dynamoevents_test.go b/lib/events/dynamoevents/dynamoevents_test.go index c387da2317b86..4123be5064fa2 100644 --- a/lib/events/dynamoevents/dynamoevents_test.go +++ b/lib/events/dynamoevents/dynamoevents_test.go @@ -23,6 +23,8 @@ import ( "encoding/json" "fmt" "math/rand" + "net/http" + "net/http/httptest" "net/url" "os" "strconv" @@ -65,7 +67,6 @@ func setupDynamoContext(t *testing.T) *dynamoContext { fakeClock := clockwork.NewFakeClockAt(time.Now().UTC()) log, err := New(context.Background(), Config{ - Region: "us-east-1", Tablename: fmt.Sprintf("teleport-test-%v", uuid.New().String()), Clock: fakeClock, UIDGenerator: utils.NewFakeUID(), @@ -589,3 +590,25 @@ func randStringAlpha(n int) string { } return string(b) } + +func TestCustomEndpoint(t *testing.T) { + ctx := context.Background() + t.Setenv("AWS_ACCESS_KEY", "llama") + t.Setenv("AWS_SECRET_KEY", "alpaca") + + mux := http.NewServeMux() + mux.HandleFunc("/*", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusTeapot) + }) + srv := httptest.NewServer(mux) + defer srv.Close() + + b, err := New(ctx, Config{ + Tablename: "teleport-test", + UIDGenerator: utils.NewFakeUID(), + Endpoint: srv.URL, + }) + assert.Error(t, err) + assert.Nil(t, b) + require.Contains(t, err.Error(), fmt.Sprintf("StatusCode: %d", http.StatusTeapot)) +} diff --git a/lib/events/test/suite.go b/lib/events/test/suite.go index 4219b28e9b301..33d9f06431065 100644 --- a/lib/events/test/suite.go +++ b/lib/events/test/suite.go @@ -34,7 +34,6 @@ import ( "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" - "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/fixtures" "github.com/gravitational/teleport/lib/session" @@ -283,20 +282,16 @@ func (s *EventsSuite) SessionEventsCRUD(t *testing.T) { var history []apievents.AuditEvent ctx := context.Background() - err = retryutils.RetryStaticFor(time.Minute*5, time.Second*5, func() error { + require.EventuallyWithT(t, func(t *assert.CollectT) { history, _, err = s.Log.SearchEvents(ctx, events.SearchEventsRequest{ From: loginTime.Add(-1 * time.Hour), To: loginTime.Add(time.Hour), Limit: 100, Order: types.EventOrderAscending, }) - if err != nil { - t.Logf("Retrying searching of events because of: %v", err) - } - return err - }) - require.NoError(t, err) - require.Len(t, history, 1) + assert.NoError(t, err) + assert.Len(t, history, 1) + }, 10*time.Second, 500*time.Millisecond) // start the session and emit data stream to it and wrap it up sessionID := session.NewID() @@ -339,20 +334,17 @@ func (s *EventsSuite) SessionEventsCRUD(t *testing.T) { require.NoError(t, err) // search for the session event. - err = retryutils.RetryStaticFor(time.Minute*5, time.Second*5, func() error { + require.EventuallyWithT(t, func(t *assert.CollectT) { history, _, err = s.Log.SearchEvents(ctx, events.SearchEventsRequest{ From: s.Clock.Now().UTC().Add(-1 * time.Hour), To: s.Clock.Now().UTC().Add(time.Hour), Limit: 100, Order: types.EventOrderAscending, }) - if err != nil { - t.Logf("Retrying searching of events because of: %v", err) - } - return err - }) - require.NoError(t, err) - require.Len(t, history, 3) + + assert.NoError(t, err) + assert.Len(t, history, 3) + }, 10*time.Second, 500*time.Millisecond) require.Equal(t, events.SessionStartEvent, history[1].GetType()) require.Equal(t, events.SessionEndEvent, history[2].GetType()) diff --git a/lib/service/service.go b/lib/service/service.go index df9a886a2fd72..64cc5d059d7ae 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -1755,11 +1755,11 @@ func (process *TeleportProcess) initAuthExternalAuditLog(auditConfig types.Clust Region: auditConfig.Region(), EnableContinuousBackups: auditConfig.EnableContinuousBackups(), EnableAutoScaling: auditConfig.EnableAutoScaling(), - ReadMinCapacity: auditConfig.ReadMinCapacity(), - ReadMaxCapacity: auditConfig.ReadMaxCapacity(), + ReadMinCapacity: int32(auditConfig.ReadMinCapacity()), + ReadMaxCapacity: int32(auditConfig.ReadMaxCapacity()), ReadTargetValue: auditConfig.ReadTargetValue(), - WriteMinCapacity: auditConfig.WriteMinCapacity(), - WriteMaxCapacity: auditConfig.WriteMaxCapacity(), + WriteMinCapacity: int32(auditConfig.WriteMinCapacity()), + WriteMaxCapacity: int32(auditConfig.WriteMaxCapacity()), WriteTargetValue: auditConfig.WriteTargetValue(), RetentionPeriod: auditConfig.RetentionPeriod(), UseFIPSEndpoint: auditConfig.GetUseFIPSEndpoint(),