diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml
index a98a2dae..780ff58a 100644
--- a/.github/ISSUE_TEMPLATE/bug_report.yml
+++ b/.github/ISSUE_TEMPLATE/bug_report.yml
@@ -59,6 +59,8 @@ body:
label: AWS Lambda function runtime
options:
- dotnet6
+ - dotnet8
+ - dotnet8 (AOT)
validations:
required: true
- type: textarea
diff --git a/docs/snippets/batch/templates/dynamodb.yaml b/docs/snippets/batch/templates/dynamodb.yaml
index 0508117a..13c83673 100644
--- a/docs/snippets/batch/templates/dynamodb.yaml
+++ b/docs/snippets/batch/templates/dynamodb.yaml
@@ -1,105 +1,118 @@
-AWSTemplateFormatVersion: '2010-09-09'
+AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
-Description: partial batch response sample
+Description: Example project demoing DynamoDB Streams processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET)
Globals:
Function:
- Timeout: 5
- MemorySize: 256
- Runtime: nodejs18.x
- Tracing: Active
+ Timeout: 20
+ Runtime: dotnet8
+ MemorySize: 1024
Environment:
Variables:
- POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-processing
+ POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-ddb
POWERTOOLS_LOG_LEVEL: Debug
- POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase
+ POWERTOOLS_LOGGER_CASE: PascalCase
POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent
POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1
- POWERTOOLS_BATCH_PARALLEL_ENABLED: false
-
-Resources:
- HelloWorldFunction:
- Type: AWS::Serverless::Function
- Properties:
- CodeUri: ./src/HelloWorld/
- Handler: HelloWorld::HelloWorld.Function::DynamoDbStreamHandlerUsingAttribute
- Policies:
- # Lambda Destinations require additional permissions
- # to send failure records from Kinesis/DynamoDB
- - Version: '2012-10-17'
- Statement:
- Effect: 'Allow'
- Action:
- - sqs:GetQueueAttributes
- - sqs:GetQueueUrl
- - sqs:SendMessage
- Resource: !GetAtt SampleDLQ.Arn
- - KMSDecryptPolicy:
- KeyId: !Ref CustomerKey
- Events:
- DynamoDBStream:
- Type: DynamoDB
- Properties:
- Stream: !GetAtt SampleTable.StreamArn
- StartingPosition: LATEST
- MaximumRetryAttempts: 2
- DestinationConfig:
- OnFailure:
- Destination: !GetAtt SampleDLQ.Arn
- FunctionResponseTypes:
- - ReportBatchItemFailures
+ POWERTOOLS_BATCH_PARALLEL_ENABLED : false
+ POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true
- SampleDLQ:
- Type: AWS::SQS::Queue
- Properties:
- KmsMasterKeyId: !Ref CustomerKey
+Resources:
- SampleTable:
- Type: AWS::DynamoDB::Table
- Properties:
- BillingMode: PAY_PER_REQUEST
- AttributeDefinitions:
- - AttributeName: pk
- AttributeType: S
- - AttributeName: sk
- AttributeType: S
- KeySchema:
- - AttributeName: pk
- KeyType: HASH
- - AttributeName: sk
- KeyType: RANGE
- SSESpecification:
- SSEEnabled: true
- StreamSpecification:
- StreamViewType: NEW_AND_OLD_IMAGES
-
# --------------
- # KMS key for encrypted queues
+ # KMS key for encrypted messages / records
CustomerKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for encrypted queues
Enabled: true
KeyPolicy:
- Version: '2012-10-17'
+ Version: "2012-10-17"
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
- AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
- Action: 'kms:*'
- Resource: '*'
- - Sid: Allow use of the key
+ AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
+ Action: "kms:*"
+ Resource: "*"
+ - Sid: Allow AWS Lambda to use the key
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- kms:Decrypt
- kms:GenerateDataKey
- Resource: '*'
+ Resource: "*"
CustomerKeyAlias:
Type: AWS::KMS::Alias
Properties:
- AliasName: alias/powertools-batch-sqs-demo
- TargetKeyId: !Ref CustomerKey
\ No newline at end of file
+ AliasName: !Sub alias/${AWS::StackName}-kms-key
+ TargetKeyId: !Ref CustomerKey
+
+ # --------------
+ # Batch Processing for DynamoDb (DDB) Stream
+ DdbStreamDeadLetterQueue:
+ Type: AWS::SQS::Queue
+ Properties:
+ KmsMasterKeyId: !Ref CustomerKey
+
+ DdbTable:
+ Type: AWS::DynamoDB::Table
+ Properties:
+ BillingMode: PAY_PER_REQUEST
+ AttributeDefinitions:
+ - AttributeName: id
+ AttributeType: S
+ KeySchema:
+ - AttributeName: id
+ KeyType: HASH
+ StreamSpecification:
+ StreamViewType: NEW_AND_OLD_IMAGES
+
+ DdbStreamBatchProcessorFunction:
+ Type: AWS::Serverless::Function
+ Properties:
+ CodeUri: ./src/HelloWorld/
+ Handler: HelloWorld::HelloWorld.Function::DynamoDbStreamHandlerUsingAttribute
+ Policies:
+ - AWSLambdaDynamoDBExecutionRole
+ - Statement:
+ - Sid: DlqPermissions
+ Effect: Allow
+ Action:
+ - sqs:SendMessage
+ - sqs:SendMessageBatch
+ Resource: !GetAtt DdbStreamDeadLetterQueue.Arn
+ - Sid: KmsKeyPermissions
+ Effect: Allow
+ Action:
+ - kms:GenerateDataKey
+ Resource: !GetAtt CustomerKey.Arn
+ Events:
+ Stream:
+ Type: DynamoDB
+ Properties:
+ BatchSize: 5
+ BisectBatchOnFunctionError: true
+ DestinationConfig:
+ OnFailure:
+ Destination: !GetAtt DdbStreamDeadLetterQueue.Arn
+ Enabled: true
+ FunctionResponseTypes:
+ - ReportBatchItemFailures
+ MaximumRetryAttempts: 2
+ ParallelizationFactor: 1
+ StartingPosition: LATEST
+ Stream: !GetAtt DdbTable.StreamArn
+
+ DdbStreamBatchProcessorFunctionLogGroup:
+ Type: AWS::Logs::LogGroup
+ Properties:
+ LogGroupName: !Sub "/aws/lambda/${DdbStreamBatchProcessorFunction}"
+ RetentionInDays: 7
+
+Outputs:
+ DdbTableName:
+ Description: "DynamoDB Table Name"
+ Value: !Ref DdbTable
\ No newline at end of file
diff --git a/docs/snippets/batch/templates/kinesis.yaml b/docs/snippets/batch/templates/kinesis.yaml
index 5088fbf4..911bd151 100644
--- a/docs/snippets/batch/templates/kinesis.yaml
+++ b/docs/snippets/batch/templates/kinesis.yaml
@@ -1,95 +1,125 @@
-AWSTemplateFormatVersion: '2010-09-09'
+AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
-Description: partial batch response sample
+Description: Example project demoing Kinesis Data Streams processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET)
Globals:
Function:
- Timeout: 5
- MemorySize: 256
- Runtime: nodejs18.x
- Tracing: Active
+ Timeout: 20
+ Runtime: dotnet8
+ MemorySize: 1024
Environment:
Variables:
- POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-processing
+ POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-kinesis
POWERTOOLS_LOG_LEVEL: Debug
- POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase
+ POWERTOOLS_LOGGER_CASE: PascalCase
POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent
POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1
- POWERTOOLS_BATCH_PARALLEL_ENABLED: false
+ POWERTOOLS_BATCH_PARALLEL_ENABLED : false
+ POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true
Resources:
- HelloWorldFunction:
- Type: AWS::Serverless::Function
- Properties:
- CodeUri: ./src/HelloWorld/
- Handler: HelloWorld::HelloWorld.Function::KinesisEventHandlerUsingAttribute
- Policies:
- # Lambda Destinations require additional permissions
- # to send failure records to DLQ from Kinesis/DynamoDB
- - Version: '2012-10-17'
- Statement:
- Effect: 'Allow'
- Action:
- - sqs:GetQueueAttributes
- - sqs:GetQueueUrl
- - sqs:SendMessage
- Resource: !GetAtt SampleDLQ.Arn
- - KMSDecryptPolicy:
- KeyId: !Ref CustomerKey
- Events:
- KinesisStream:
- Type: Kinesis
- Properties:
- Stream: !GetAtt SampleStream.Arn
- BatchSize: 100
- StartingPosition: LATEST
- MaximumRetryAttempts: 2
- DestinationConfig:
- OnFailure:
- Destination: !GetAtt SampleDLQ.Arn
- FunctionResponseTypes:
- - ReportBatchItemFailures
- SampleDLQ:
- Type: AWS::SQS::Queue
- Properties:
- KmsMasterKeyId: !Ref CustomerKey
-
- SampleStream:
- Type: AWS::Kinesis::Stream
- Properties:
- ShardCount: 1
- StreamEncryption:
- EncryptionType: KMS
- KeyId: alias/aws/kinesis
-
# --------------
- # KMS key for encrypted queues
+ # KMS key for encrypted messages / records
CustomerKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for encrypted queues
Enabled: true
KeyPolicy:
- Version: '2012-10-17'
+ Version: "2012-10-17"
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
- AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
- Action: 'kms:*'
- Resource: '*'
- - Sid: Allow use of the key
+ AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
+ Action: "kms:*"
+ Resource: "*"
+ - Sid: Allow AWS Lambda to use the key
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- kms:Decrypt
- kms:GenerateDataKey
- Resource: '*'
+ Resource: "*"
CustomerKeyAlias:
Type: AWS::KMS::Alias
Properties:
- AliasName: alias/powertools-batch-sqs-demo
- TargetKeyId: !Ref CustomerKey
\ No newline at end of file
+ AliasName: !Sub alias/${AWS::StackName}-kms-key
+ TargetKeyId: !Ref CustomerKey
+
+ # --------------
+ # Batch Processing for Kinesis Data Stream
+ KinesisStreamDeadLetterQueue:
+ Type: AWS::SQS::Queue
+ Properties:
+ KmsMasterKeyId: !Ref CustomerKey
+
+ KinesisStream:
+ Type: AWS::Kinesis::Stream
+ Properties:
+ ShardCount: 1
+ StreamEncryption:
+ EncryptionType: KMS
+ KeyId: !Ref CustomerKey
+
+ KinesisStreamConsumer:
+ Type: AWS::Kinesis::StreamConsumer
+ Properties:
+ ConsumerName: powertools-dotnet-sample-batch-kds-consumer
+ StreamARN: !GetAtt KinesisStream.Arn
+
+ KinesisBatchProcessorFunction:
+ Type: AWS::Serverless::Function
+ Properties:
+ Policies:
+ - Statement:
+ - Sid: KinesisStreamConsumerPermissions
+ Effect: Allow
+ Action:
+ - kinesis:DescribeStreamConsumer
+ Resource:
+ - !GetAtt KinesisStreamConsumer.ConsumerARN
+ - Sid: DlqPermissions
+ Effect: Allow
+ Action:
+ - sqs:SendMessage
+ - sqs:SendMessageBatch
+ Resource: !GetAtt KinesisStreamDeadLetterQueue.Arn
+ - Sid: KmsKeyPermissions
+ Effect: Allow
+ Action:
+ - kms:Decrypt
+ - kms:GenerateDataKey
+ Resource: !GetAtt CustomerKey.Arn
+ CodeUri: ./src/HelloWorld/
+ Handler: HelloWorld::HelloWorld.Function::KinesisEventHandlerUsingAttribute
+ Events:
+ Kinesis:
+ Type: Kinesis
+ Properties:
+ BatchSize: 5
+ BisectBatchOnFunctionError: true
+ DestinationConfig:
+ OnFailure:
+ Destination: !GetAtt KinesisStreamDeadLetterQueue.Arn
+ Enabled: true
+ FunctionResponseTypes:
+ - ReportBatchItemFailures
+ MaximumRetryAttempts: 2
+ ParallelizationFactor: 1
+ StartingPosition: LATEST
+ Stream: !GetAtt KinesisStreamConsumer.ConsumerARN
+
+ KinesisBatchProcessorFunctionLogGroup:
+ Type: AWS::Logs::LogGroup
+ Properties:
+ LogGroupName: !Sub "/aws/lambda/${KinesisBatchProcessorFunction}"
+ RetentionInDays: 7
+
+Outputs:
+ KinesisStreamArn:
+ Description: "Kinesis Stream ARN"
+ Value: !GetAtt KinesisStream.Arn
\ No newline at end of file
diff --git a/docs/snippets/batch/templates/sqs.yaml b/docs/snippets/batch/templates/sqs.yaml
index dfda6bed..fbd21305 100644
--- a/docs/snippets/batch/templates/sqs.yaml
+++ b/docs/snippets/batch/templates/sqs.yaml
@@ -1,83 +1,106 @@
-AWSTemplateFormatVersion: '2010-09-09'
+AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
-Description: partial batch response sample
+Description: Example project demoing SQS Queue processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET)
Globals:
Function:
- Timeout: 5
- MemorySize: 256
- Runtime: nodejs18.x
- Tracing: Active
+ Timeout: 20
+ Runtime: dotnet8
+ MemorySize: 1024
Environment:
Variables:
- POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-processing
+ POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-sqs
POWERTOOLS_LOG_LEVEL: Debug
- POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase
+ POWERTOOLS_LOGGER_CASE: PascalCase
POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent
POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1
- POWERTOOLS_BATCH_PARALLEL_ENABLED: false
+ POWERTOOLS_BATCH_PARALLEL_ENABLED : false
+ POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true
Resources:
- HelloWorldFunction:
- Type: AWS::Serverless::Function
- Properties:
- CodeUri: ./src/HelloWorld/
- Handler: HelloWorld::HelloWorld.Function::SqsHandlerUsingAttribute
- Policies:
- - SQSPollerPolicy:
- QueueName: !GetAtt SampleQueue.QueueName
- - KMSDecryptPolicy:
- KeyId: !Ref CustomerKey
- Events:
- Batch:
- Type: SQS
- Properties:
- Queue: !GetAtt SampleQueue.Arn
- FunctionResponseTypes:
- - ReportBatchItemFailures
-
- SampleDLQ:
- Type: AWS::SQS::Queue
- Properties:
- KmsMasterKeyId: !Ref CustomerKey
- SampleQueue:
- Type: AWS::SQS::Queue
- Properties:
- VisibilityTimeout: 30 # Fn timeout * 6
- SqsManagedSseEnabled: true
- RedrivePolicy:
- maxReceiveCount: 2
- deadLetterTargetArn: !GetAtt SampleDLQ.Arn
- KmsMasterKeyId: !Ref CustomerKey
-
# --------------
- # KMS key for encrypted queues
+ # KMS key for encrypted messages / records
CustomerKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for encrypted queues
Enabled: true
KeyPolicy:
- Version: '2012-10-17'
+ Version: "2012-10-17"
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
- AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
- Action: 'kms:*'
- Resource: '*'
- - Sid: Allow use of the key
+ AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
+ Action: "kms:*"
+ Resource: "*"
+ - Sid: Allow AWS Lambda to use the key
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- kms:Decrypt
- kms:GenerateDataKey
- Resource: '*'
+ Resource: "*"
CustomerKeyAlias:
Type: AWS::KMS::Alias
Properties:
- AliasName: alias/powertools-batch-sqs-demo
- TargetKeyId: !Ref CustomerKey
\ No newline at end of file
+ AliasName: !Sub alias/${AWS::StackName}-kms-key
+ TargetKeyId: !Ref CustomerKey
+
+ # --------------
+ # Batch Processing for SQS Queue
+ SqsDeadLetterQueue:
+ Type: AWS::SQS::Queue
+ Properties:
+ KmsMasterKeyId: !Ref CustomerKey
+
+ SqsQueue:
+ Type: AWS::SQS::Queue
+ Properties:
+ RedrivePolicy:
+ deadLetterTargetArn: !GetAtt SqsDeadLetterQueue.Arn
+ maxReceiveCount: 2
+ KmsMasterKeyId: !Ref CustomerKey
+
+ SqsBatchProcessorFunction:
+ Type: AWS::Serverless::Function
+ Properties:
+ CodeUri: ./src/HelloWorld/
+ Handler: HelloWorld::HelloWorld.Function::SqsHandlerUsingAttribute
+ Policies:
+ - Statement:
+ - Sid: DlqPermissions
+ Effect: Allow
+ Action:
+ - sqs:SendMessage
+ - sqs:SendMessageBatch
+ Resource: !GetAtt SqsDeadLetterQueue.Arn
+ - Sid: KmsKeyPermissions
+ Effect: Allow
+ Action:
+ - kms:Decrypt
+ - kms:GenerateDataKey
+ Resource: !GetAtt CustomerKey.Arn
+ Events:
+ SqsBatch:
+ Type: SQS
+ Properties:
+ BatchSize: 5
+ Enabled: true
+ FunctionResponseTypes:
+ - ReportBatchItemFailures
+ Queue: !GetAtt SqsQueue.Arn
+
+ SqsBatchProcessorFunctionLogGroup:
+ Type: AWS::Logs::LogGroup
+ Properties:
+ LogGroupName: !Sub "/aws/lambda/${SqsBatchProcessorFunction}"
+ RetentionInDays: 7
+
+Outputs:
+ SqsQueueUrl:
+ Description: "SQS Queue URL"
+ Value: !Ref SqsQueue
\ No newline at end of file
diff --git a/docs/utilities/batch-processing.md b/docs/utilities/batch-processing.md
index eb18eb7c..021f8ef8 100644
--- a/docs/utilities/batch-processing.md
+++ b/docs/utilities/batch-processing.md
@@ -84,11 +84,12 @@ You use your preferred deployment framework to set the correct configuration whi
Batch processing can be configured with the settings bellow:
-Setting | Description | Environment variable | Default
-------------------------------------------------- |-------------------------------------------------------------------------| ------------------------------------------------- | -------------------------------------------------
-**Error Handling Policy** | Sets the error handling policy applied during batch processing. | `POWERTOOLS_BATCH_ERROR_HANDLING_POLICY` | `DeriveFromEvent`
-**Parallel Enabled** | Sets if parallelism is enabled | `POWERTOOLS_BATCH_PARALLEL_ENABLED` | `false`
-**Max Degree of Parallelism** | Sets the maximum degree of parallelism to apply during batch processing | `POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM` | `1`
+Setting | Description | Environment variable | Default
+------- | ----------- | -------------------- | -------
+**Error Handling Policy** | The error handling policy to apply during batch processing. | `POWERTOOLS_BATCH_ERROR_HANDLING_POLICY` | `DeriveFromEvent`
+**Parallel Enabled** | Controls if parallel processing of batch items is enabled. | `POWERTOOLS_BATCH_PARALLEL_ENABLED` | `false`
+**Max Degree of Parallelism** | The maximum degree of parallelism to apply if parallel processing is enabled. | `POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM` | `1`
+**Throw on Full Batch Failure** | Controls if a `BatchProcessingException` is thrown on full batch failure. | `POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE` | `true`
### Required resources
@@ -98,19 +99,19 @@ The remaining sections of the documentation will rely on these samples. For comp
=== "SQS"
- ```yaml title="template.yaml" hl_lines="36-37"
+ ```yaml title="template.yaml" hl_lines="93-94"
--8<-- "docs/snippets/batch/templates/sqs.yaml"
```
=== "Kinesis Data Streams"
- ```yaml title="template.yaml" hl_lines="50-51"
+ ```yaml title="template.yaml" hl_lines="109-110"
--8<-- "docs/snippets/batch/templates/kinesis.yaml"
```
=== "DynamoDB Streams"
- ```yaml title="template.yaml" hl_lines="49-50"
+ ```yaml title="template.yaml" hl_lines="102-103"
--8<-- "docs/snippets/batch/templates/dynamodb.yaml"
```
@@ -613,14 +614,13 @@ Another approach is to decorate the handler and use one of the policies in the *
```
-
### Partial failure mechanics
All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
-* **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}`
-* **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing
-* **All records failed to be processed**. We will raise `AWS.Lambda.Powertools.BatchProcessing.Exceptions.BatchProcessingException` exception with a list of all exceptions raised when processing
+* **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}`.
+* **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing.
+* **All records failed to be processed**. By defaullt, we will throw a `BatchProcessingException` with a list of all exceptions raised during processing to reflect the failure in your operational metrics. However, in some scenarios, this might not be desired. See [Working with full batch failures](#working-with-full-batch-failures) for more information.
The following sequence diagrams explain how each Batch processor behaves under different scenarios.
@@ -735,7 +735,6 @@ sequenceDiagram
Kinesis and DynamoDB streams mechanism with multiple batch item failures
-
### Advanced
#### Using utility outside handler and IoC
@@ -848,6 +847,44 @@ You can also set `POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM` Environment Variab
}
```
+#### Working with full batch failures
+
+By default, the `BatchProcessor` will throw a `BatchProcessingException` if all records in the batch fail to process. We do this to reflect the failure in your operational metrics.
+
+When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance.
+
+For these scenarios, you can set `POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE = false`, or the equivalent on either the `BatchProcessor` decorator or on the `ProcessingOptions` object. See examples below.
+
+=== "Setting ThrowOnFullBatchFailure on Decorator"
+
+ ```csharp hl_lines="3"
+ [BatchProcessor(
+ RecordHandler = typeof(CustomSqsRecordHandler),
+ ThrowOnFullBatchFailure = false)]
+ public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _)
+ {
+ return SqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+
+ ```
+
+=== "Setting ThrowOnFullBatchFailure outside Decorator"
+
+ ```csharp hl_lines="8"
+ public async Task HandlerUsingUtility(SQSEvent sqsEvent)
+ {
+ var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, RecordHandler.From(x =>
+ {
+ // Inline handling of SQS message...
+ }), new ProcessingOptions
+ {
+ ThrowOnFullBatchFailure = false
+ });
+ return result.BatchItemFailuresResponse;
+ }
+
+ ```
+
#### Extending BatchProcessor
You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures.
diff --git a/examples/AOT/AOT_Logging/src/AOT_Logging/AOT_Logging.csproj b/examples/AOT/AOT_Logging/src/AOT_Logging/AOT_Logging.csproj
index f4b9fa5a..ddad6082 100644
--- a/examples/AOT/AOT_Logging/src/AOT_Logging/AOT_Logging.csproj
+++ b/examples/AOT/AOT_Logging/src/AOT_Logging/AOT_Logging.csproj
@@ -20,6 +20,6 @@
-
+
\ No newline at end of file
diff --git a/examples/BatchProcessing/src/HelloWorld/Function.cs b/examples/BatchProcessing/src/HelloWorld/Function.cs
index f1ff8d42..f8bd0aad 100644
--- a/examples/BatchProcessing/src/HelloWorld/Function.cs
+++ b/examples/BatchProcessing/src/HelloWorld/Function.cs
@@ -42,58 +42,58 @@ static Function()
Services.Init();
}
- [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))]
[Logging(LogEvent = true)]
+ [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))]
public BatchItemFailuresResponse DynamoDbStreamHandlerUsingAttribute(DynamoDBEvent _)
{
return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
}
-
- [BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))]
+
[Logging(LogEvent = true)]
+ [BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))]
public BatchItemFailuresResponse KinesisEventHandlerUsingAttribute(KinesisEvent _)
{
return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse;
}
- [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))]
[Logging(LogEvent = true)]
+ [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))]
public BatchItemFailuresResponse SqsHandlerUsingAttribute(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
- [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
+
[Logging(LogEvent = true)]
+ [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
public BatchItemFailuresResponse SqsHandlerUsingAttributeWithErrorPolicy(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
#region More example handlers...
-
- [BatchProcessor(RecordHandlerProvider = typeof(CustomSqsRecordHandlerProvider), BatchProcessor = typeof(CustomSqsBatchProcessor))]
+
[Logging(LogEvent = true)]
+ [BatchProcessor(RecordHandlerProvider = typeof(CustomSqsRecordHandlerProvider), BatchProcessor = typeof(CustomSqsBatchProcessor))]
public BatchItemFailuresResponse HandlerUsingAttributeAndCustomRecordHandlerProvider(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
- }
-
- [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessor = typeof(CustomSqsBatchProcessor))]
+ }
+
[Logging(LogEvent = true)]
+ [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessor = typeof(CustomSqsBatchProcessor))]
public BatchItemFailuresResponse HandlerUsingAttributeAndCustomBatchProcessor(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
- [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessorProvider = typeof(CustomSqsBatchProcessorProvider))]
+
[Logging(LogEvent = true)]
+ [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessorProvider = typeof(CustomSqsBatchProcessorProvider))]
public BatchItemFailuresResponse HandlerUsingAttributeAndCustomBatchProcessorProvider(SQSEvent _)
{
var batchProcessor = Services.Provider.GetRequiredService();
return batchProcessor.ProcessingResult.BatchItemFailuresResponse;
}
-
+
[Logging(LogEvent = true)]
public async Task HandlerUsingUtility(SQSEvent sqsEvent)
{
@@ -103,7 +103,7 @@ public async Task HandlerUsingUtility(SQSEvent sqsEve
}));
return result.BatchItemFailuresResponse;
}
-
+
[Logging(LogEvent = true)]
public async Task HandlerUsingUtilityFromIoc(SQSEvent sqsEvent)
{
@@ -112,6 +112,6 @@ public async Task HandlerUsingUtilityFromIoc(SQSEvent
var result = await batchProcessor.ProcessAsync(sqsEvent, recordHandler);
return result.BatchItemFailuresResponse;
}
-
+
#endregion
}
diff --git a/examples/BatchProcessing/src/HelloWorld/HelloWorld.csproj b/examples/BatchProcessing/src/HelloWorld/HelloWorld.csproj
index c7ed631c..30249138 100644
--- a/examples/BatchProcessing/src/HelloWorld/HelloWorld.csproj
+++ b/examples/BatchProcessing/src/HelloWorld/HelloWorld.csproj
@@ -1,6 +1,6 @@
- net6.0;net8.0
+ net8.0
true
enable
@@ -8,7 +8,7 @@
-
+
diff --git a/examples/BatchProcessing/template.yaml b/examples/BatchProcessing/template.yaml
index 937f1e75..17dfd20c 100644
--- a/examples/BatchProcessing/template.yaml
+++ b/examples/BatchProcessing/template.yaml
@@ -2,122 +2,116 @@
# SPDX-License-Identifier: MIT-0
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
-Description: Example project demoing the Batch Processing utility SQS in Powertools for AWS Lambda (.NET)
+Description: Example project demoing the Batch Processing Utility in Powertools for AWS Lambda (.NET)
-# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 20
- Runtime: dotnet6
- MemorySize: 256
+ Runtime: dotnet8
+ MemorySize: 1024
Environment:
Variables:
- POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-processing
+ POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch
POWERTOOLS_LOG_LEVEL: Debug
- POWERTOOLS_LOGGER_CASE: SnakeCase # Allowed values are: CamelCase, PascalCase and SnakeCase
+ POWERTOOLS_LOGGER_CASE: PascalCase
POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent
POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1
POWERTOOLS_BATCH_PARALLEL_ENABLED : false
+ POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true
Resources:
-
+
# --------------
- # KMS key for encrypted queues
+ # KMS key for encrypted messages / records
CustomerKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for encrypted queues
Enabled: true
KeyPolicy:
- Version: '2012-10-17'
+ Version: "2012-10-17"
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
- AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
- Action: 'kms:*'
- Resource: '*'
- - Sid: Allow use of the key
+ AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
+ Action: "kms:*"
+ Resource: "*"
+ - Sid: Allow AWS Lambda to use the key
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- kms:Decrypt
- kms:GenerateDataKey
- Resource: '*'
-
+ Resource: "*"
+
CustomerKeyAlias:
Type: AWS::KMS::Alias
Properties:
- AliasName: alias/powertools-batch-sqs-demo
+ AliasName: !Sub alias/${AWS::StackName}-kms-key
TargetKeyId: !Ref CustomerKey
# --------------
- # SQS DL Queue
- DemoDlqSqsQueue:
+ # Batch Processing for SQS Queue
+ SqsDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
KmsMasterKeyId: !Ref CustomerKey
-
- # --------------
- # SQS Queue
- DemoSqsQueue:
+
+ SqsQueue:
Type: AWS::SQS::Queue
Properties:
RedrivePolicy:
- deadLetterTargetArn:
- Fn::GetAtt:
- - "DemoDlqSqsQueue"
- - "Arn"
+ deadLetterTargetArn: !GetAtt SqsDeadLetterQueue.Arn
maxReceiveCount: 2
KmsMasterKeyId: !Ref CustomerKey
-
- # --------------
- # Batch Processing for SQS
- SampleSqsBatchProcessorFunction:
- Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
+
+ SqsBatchProcessorFunction:
+ Type: AWS::Serverless::Function
Properties:
- FunctionName: powertools-dotnet-sample-batch-processor-sqs
CodeUri: ./src/HelloWorld/
Handler: HelloWorld::HelloWorld.Function::SqsHandlerUsingAttribute
- # ReservedConcurrentExecutions: 1
Policies:
- Statement:
- - Sid: SQSDeleteGetAttribute
+ - Sid: DlqPermissions
Effect: Allow
Action:
- - sqs:DeleteMessageBatch
- - sqs:GetQueueAttributes
- Resource: !GetAtt DemoSqsQueue.Arn
- - Sid: SQSSendMessageBatch
- Effect: Allow
- Action:
- - sqs:SendMessageBatch
- sqs:SendMessage
- Resource: !GetAtt DemoDlqSqsQueue.Arn
- - Sid: SQSKMSKey
+ - sqs:SendMessageBatch
+ Resource: !GetAtt SqsDeadLetterQueue.Arn
+ - Sid: KmsKeyPermissions
Effect: Allow
Action:
- - kms:GenerateDataKey
- kms:Decrypt
+ - kms:GenerateDataKey
Resource: !GetAtt CustomerKey.Arn
Events:
SqsBatch:
- Type: SQS # More info about SQS Event Source: https://github.com/aws/serverless-application-model/blob/master/versions/2016-10-31.md#sqs
+ Type: SQS
Properties:
- Queue: !GetAtt DemoSqsQueue.Arn
BatchSize: 5
-# MaximumBatchingWindowInSeconds: 300
- FunctionResponseTypes:
+ Enabled: true
+ FunctionResponseTypes:
- ReportBatchItemFailures
-
+ Queue: !GetAtt SqsQueue.Arn
+
+ SqsBatchProcessorFunctionLogGroup:
+ Type: AWS::Logs::LogGroup
+ Properties:
+ LogGroupName: !Sub "/aws/lambda/${SqsBatchProcessorFunction}"
+ RetentionInDays: 7
+
# --------------
- # Batch Processing for DynamoDb
-
- SampleDynamoDBTable:
+ # Batch Processing for DynamoDb (DDB) Stream
+ DdbStreamDeadLetterQueue:
+ Type: AWS::SQS::Queue
+ Properties:
+ KmsMasterKeyId: !Ref CustomerKey
+
+ DdbTable:
Type: AWS::DynamoDB::Table
Properties:
- TableName: powertools-dotnet-sample-dynamodb-table
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: id
@@ -127,28 +121,57 @@ Resources:
KeyType: HASH
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
-
- DemoDynamoDBStreamProcessorFunction:
+
+ DdbStreamBatchProcessorFunction:
Type: AWS::Serverless::Function
Properties:
- FunctionName: powertools-dotnet-sample-batch-processor-dynamodb
CodeUri: ./src/HelloWorld/
Handler: HelloWorld::HelloWorld.Function::DynamoDbStreamHandlerUsingAttribute
- Policies: AWSLambdaDynamoDBExecutionRole
+ Policies:
+ - AWSLambdaDynamoDBExecutionRole
+ - Statement:
+ - Sid: DlqPermissions
+ Effect: Allow
+ Action:
+ - sqs:SendMessage
+ - sqs:SendMessageBatch
+ Resource: !GetAtt DdbStreamDeadLetterQueue.Arn
+ - Sid: KmsKeyPermissions
+ Effect: Allow
+ Action:
+ - kms:GenerateDataKey
+ Resource: !GetAtt CustomerKey.Arn
Events:
Stream:
Type: DynamoDB
Properties:
- Stream: !GetAtt SampleDynamoDBTable.StreamArn
- BatchSize: 100
- StartingPosition: TRIM_HORIZON
+ BatchSize: 5
+ BisectBatchOnFunctionError: true
+ DestinationConfig:
+ OnFailure:
+ Destination: !GetAtt DdbStreamDeadLetterQueue.Arn
+ Enabled: true
FunctionResponseTypes:
- ReportBatchItemFailures
-
+ MaximumRetryAttempts: 2
+ ParallelizationFactor: 1
+ StartingPosition: LATEST
+ Stream: !GetAtt DdbTable.StreamArn
+
+ DdbStreamBatchProcessorFunctionLogGroup:
+ Type: AWS::Logs::LogGroup
+ Properties:
+ LogGroupName: !Sub "/aws/lambda/${DdbStreamBatchProcessorFunction}"
+ RetentionInDays: 7
+
# --------------
- # Batch Processing for Kinesis Data Streams
-
- DemoKinesisStream:
+ # Batch Processing for Kinesis Data Stream
+ KinesisStreamDeadLetterQueue:
+ Type: AWS::SQS::Queue
+ Properties:
+ KmsMasterKeyId: !Ref CustomerKey
+
+ KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
@@ -156,42 +179,67 @@ Resources:
EncryptionType: KMS
KeyId: !Ref CustomerKey
- StreamConsumer:
- Type: "AWS::Kinesis::StreamConsumer"
+ KinesisStreamConsumer:
+ Type: AWS::Kinesis::StreamConsumer
Properties:
- StreamARN: !GetAtt DemoKinesisStream.Arn
- ConsumerName: KinesisBatchHandlerConsumer
-
-
- SampleKinesisEventBatchProcessorFunction:
+ ConsumerName: powertools-dotnet-sample-batch-kds-consumer
+ StreamARN: !GetAtt KinesisStream.Arn
+
+ KinesisBatchProcessorFunction:
Type: AWS::Serverless::Function
Properties:
- FunctionName: powertools-dotnet-sample-batch-processor-kinesis-data-stream
- Runtime: dotnet6
+ Policies:
+ - Statement:
+ - Sid: KinesisStreamConsumerPermissions
+ Effect: Allow
+ Action:
+ - kinesis:DescribeStreamConsumer
+ Resource:
+ - !GetAtt KinesisStreamConsumer.ConsumerARN
+ - Sid: DlqPermissions
+ Effect: Allow
+ Action:
+ - sqs:SendMessage
+ - sqs:SendMessageBatch
+ Resource: !GetAtt KinesisStreamDeadLetterQueue.Arn
+ - Sid: KmsKeyPermissions
+ Effect: Allow
+ Action:
+ - kms:Decrypt
+ - kms:GenerateDataKey
+ Resource: !GetAtt CustomerKey.Arn
CodeUri: ./src/HelloWorld/
Handler: HelloWorld::HelloWorld.Function::KinesisEventHandlerUsingAttribute
- MemorySize: 256
Events:
Kinesis:
Type: Kinesis
Properties:
- Stream: !GetAtt StreamConsumer.ConsumerARN
+ BatchSize: 5
+ BisectBatchOnFunctionError: true
+ DestinationConfig:
+ OnFailure:
+ Destination: !GetAtt KinesisStreamDeadLetterQueue.Arn
+ Enabled: true
+ FunctionResponseTypes:
+ - ReportBatchItemFailures
+ MaximumRetryAttempts: 2
+ ParallelizationFactor: 1
StartingPosition: LATEST
- BatchSize: 2
+ Stream: !GetAtt KinesisStreamConsumer.ConsumerARN
+
+ KinesisBatchProcessorFunctionLogGroup:
+ Type: AWS::Logs::LogGroup
+ Properties:
+ LogGroupName: !Sub "/aws/lambda/${KinesisBatchProcessorFunction}"
+ RetentionInDays: 7
Outputs:
- DemoSqsQueue:
- Description: "ARN for main SQS queue"
- Value: !GetAtt DemoSqsQueue.Arn
- DemoDlqSqsQueue:
- Description: "ARN for DLQ"
- Value: !GetAtt DemoDlqSqsQueue.Arn
- SampleSqsBatchProcessorFunction:
- Description: "SQS Batch Handler - Lambda Function ARN"
- Value: !GetAtt SampleSqsBatchProcessorFunction.Arn
- DemoKinesisQueue:
- Description: "ARN for Kinesis Stream"
- Value: !GetAtt DemoKinesisStream.Arn
- DemoSQSConsumerFunction:
- Description: "SQS Batch Handler - Lambda Function ARN"
- Value: !GetAtt SampleKinesisEventBatchProcessorFunction.Arn
\ No newline at end of file
+ SqsQueueUrl:
+ Description: "SQS Queue URL"
+ Value: !Ref SqsQueue
+ DdbTableName:
+ Description: "DynamoDB Table Name"
+ Value: !Ref DdbTable
+ KinesisStreamArn:
+ Description: "Kinesis Stream ARN"
+ Value: !GetAtt KinesisStream.Arn
\ No newline at end of file
diff --git a/examples/BatchProcessing/test/HelloWorld.Test/HelloWorld.Tests.csproj b/examples/BatchProcessing/test/HelloWorld.Test/HelloWorld.Tests.csproj
index b4be1182..903aee7d 100644
--- a/examples/BatchProcessing/test/HelloWorld.Test/HelloWorld.Tests.csproj
+++ b/examples/BatchProcessing/test/HelloWorld.Test/HelloWorld.Tests.csproj
@@ -1,6 +1,6 @@
- net6.0;net8.0
+ net8.0
diff --git a/examples/Idempotency/src/HelloWorld/HelloWorld.csproj b/examples/Idempotency/src/HelloWorld/HelloWorld.csproj
index 83b3be9f..b3260917 100644
--- a/examples/Idempotency/src/HelloWorld/HelloWorld.csproj
+++ b/examples/Idempotency/src/HelloWorld/HelloWorld.csproj
@@ -9,6 +9,6 @@
-
+
diff --git a/examples/Logging/src/HelloWorld/HelloWorld.csproj b/examples/Logging/src/HelloWorld/HelloWorld.csproj
index 58b01757..439c2d27 100644
--- a/examples/Logging/src/HelloWorld/HelloWorld.csproj
+++ b/examples/Logging/src/HelloWorld/HelloWorld.csproj
@@ -8,7 +8,7 @@
-
+
diff --git a/examples/Metrics/src/HelloWorld/HelloWorld.csproj b/examples/Metrics/src/HelloWorld/HelloWorld.csproj
index db6bf8bf..4bcfd8a3 100644
--- a/examples/Metrics/src/HelloWorld/HelloWorld.csproj
+++ b/examples/Metrics/src/HelloWorld/HelloWorld.csproj
@@ -8,7 +8,7 @@
-
+
diff --git a/examples/ServerlessApi/src/LambdaPowertoolsAPI/LambdaPowertoolsAPI.csproj b/examples/ServerlessApi/src/LambdaPowertoolsAPI/LambdaPowertoolsAPI.csproj
index fd8e3c25..dfa8c4bf 100644
--- a/examples/ServerlessApi/src/LambdaPowertoolsAPI/LambdaPowertoolsAPI.csproj
+++ b/examples/ServerlessApi/src/LambdaPowertoolsAPI/LambdaPowertoolsAPI.csproj
@@ -13,7 +13,7 @@
-
+
diff --git a/examples/Tracing/src/HelloWorld/HelloWorld.csproj b/examples/Tracing/src/HelloWorld/HelloWorld.csproj
index b0a1a528..5d051ba9 100644
--- a/examples/Tracing/src/HelloWorld/HelloWorld.csproj
+++ b/examples/Tracing/src/HelloWorld/HelloWorld.csproj
@@ -8,7 +8,7 @@
-
+
diff --git a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessor.cs b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessor.cs
index 2b3b84ce..ba3c5f3f 100644
--- a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessor.cs
+++ b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessor.cs
@@ -87,7 +87,7 @@ public virtual async Task> ProcessAsync(TEvent @event,
: processingOptions.ErrorHandlingPolicy;
// Invoke hook
- await BeforeBatchProcessingAsync(@event);
+ await BeforeBatchProcessingAsync(@event, processingOptions);
try
{
@@ -139,9 +139,13 @@ await ProcessRecord(recordHandler, pair, cancellationToken, failureRecords, succ
ProcessingResult.FailureRecords.AddRange(failureRecords.Values);
}
- if (successRecords != null) ProcessingResult.SuccessRecords.AddRange(successRecords.Values);
+ if (successRecords != null)
+ {
+ ProcessingResult.SuccessRecords.AddRange(successRecords.Values);
+ }
+
// Invoke hook
- await AfterBatchProcessingAsync(@event, ProcessingResult);
+ await AfterBatchProcessingAsync(@event, ProcessingResult, processingOptions);
// Return result
return ProcessingResult;
@@ -226,8 +230,9 @@ private async Task ProcessRecord(IRecordHandler recordHandler, KeyValue
/// Hook invoked before the batch event is processed.
///
/// The event to be processed.
+ /// The configured batch processing options for this batch processing run.
/// An awaitable .
- protected virtual async Task BeforeBatchProcessingAsync(TEvent @event)
+ protected virtual async Task BeforeBatchProcessingAsync(TEvent @event, ProcessingOptions processingOptions)
{
await Task.CompletedTask;
}
@@ -272,12 +277,15 @@ protected virtual async Task HandleRecordFailureAsync(TRecord record, Exception
/// Hook invoked after the batch event has been processed.
///
/// The event that was processed.
- ///
+ /// The result of this batch processing run.
+ /// The configured batch processing options for this batch processing run.
/// An awaitable .
///
- protected virtual async Task AfterBatchProcessingAsync(TEvent @event, ProcessingResult processingResult)
+ protected virtual async Task AfterBatchProcessingAsync(TEvent @event,
+ ProcessingResult processingResult,
+ ProcessingOptions processingOptions)
{
- if (processingResult.BatchRecords.Count == processingResult.FailureRecords.Count)
+ if (processingOptions.ThrowOnFullBatchFailure && processingResult.BatchRecords.Count == processingResult.FailureRecords.Count)
{
throw new BatchProcessingException(
$"Entire batch of '{processingResult.BatchRecords.Count}' record(s) failed processing. See inner exceptions for details.",
diff --git a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs
index b8c0b281..d693d4ec 100644
--- a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs
+++ b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs
@@ -23,6 +23,7 @@
using Amazon.Lambda.SQSEvents;
using AspectInjector.Broker;
using AWS.Lambda.Powertools.BatchProcessing.DynamoDb;
+using AWS.Lambda.Powertools.BatchProcessing.Exceptions;
using AWS.Lambda.Powertools.BatchProcessing.Internal;
using AWS.Lambda.Powertools.BatchProcessing.Kinesis;
using AWS.Lambda.Powertools.BatchProcessing.Sqs;
@@ -90,6 +91,10 @@ namespace AWS.Lambda.Powertools.BatchProcessing;
/// POWERTOOLS_BATCH_PROCESSING_MAX_DEGREE_OF_PARALLELISM
/// int, defaults to 1 (no parallelism). Specify -1 to automatically use the value of ProcessorCount.
///
+/// -
+/// POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE
+/// bool, defaults to true. Controls if an exception is thrown on full batch failure.
+///
///
///
/// Parameters
@@ -123,6 +128,10 @@ namespace AWS.Lambda.Powertools.BatchProcessing;
/// MaxDegreeOfParallelism
/// int, defaults to 1 (no parallelism). Specify -1 to automatically use the value of ProcessorCount.
///
+/// -
+/// ThrowOnFullBatchFailure
+/// bool, defaults to true. Controls if an exception is thrown on full batch failure.
+///
///
///
[AttributeUsage(AttributeTargets.Method)]
@@ -132,38 +141,44 @@ public class BatchProcessorAttribute : UniversalWrapperAttribute
///
/// Type of batch processor.
///
- public Type BatchProcessor;
+ public Type BatchProcessor { get; set; }
///
/// Type of batch processor provider.
///
- public Type BatchProcessorProvider;
+ public Type BatchProcessorProvider { get; set; }
///
/// Type of record handler.
///
- public Type RecordHandler;
+ public Type RecordHandler { get; set; }
///
/// Type of record handler provider.
///
- public Type RecordHandlerProvider;
+ public Type RecordHandlerProvider { get; set; }
///
/// Error handling policy.
///
- public BatchProcessorErrorHandlingPolicy ErrorHandlingPolicy;
+ public BatchProcessorErrorHandlingPolicy ErrorHandlingPolicy { get; set; }
///
/// Batch processing enabled (default false)
///
- public bool BatchParallelProcessingEnabled = PowertoolsConfigurations.Instance.BatchParallelProcessingEnabled;
+ public bool BatchParallelProcessingEnabled { get; set; } = PowertoolsConfigurations.Instance.BatchParallelProcessingEnabled;
///
/// The maximum degree of parallelism to apply during batch processing.
/// Must enable BatchParallelProcessingEnabled
///
- public int MaxDegreeOfParallelism = PowertoolsConfigurations.Instance.BatchProcessingMaxDegreeOfParallelism;
+ public int MaxDegreeOfParallelism { get; set; } = PowertoolsConfigurations.Instance.BatchProcessingMaxDegreeOfParallelism;
+
+ ///
+ /// By default, the Batch processor throws a on full batch failure.
+ /// This behaviour can be disabled by setting this value to false.
+ ///
+ public bool ThrowOnFullBatchFailure { get; set; } = PowertoolsConfigurations.Instance.BatchThrowOnFullBatchFailureEnabled;
private static readonly Dictionary EventTypes = new()
{
@@ -332,7 +347,8 @@ private BatchProcessingAspectHandler CreateBatchProcessingAspec
CancellationToken = CancellationToken.None,
ErrorHandlingPolicy = errorHandlingPolicy,
MaxDegreeOfParallelism = MaxDegreeOfParallelism,
- BatchParallelProcessingEnabled = BatchParallelProcessingEnabled
+ BatchParallelProcessingEnabled = BatchParallelProcessingEnabled,
+ ThrowOnFullBatchFailure = ThrowOnFullBatchFailure
});
}
}
diff --git a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/ProcessingOptions.cs b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/ProcessingOptions.cs
index 4c0b72da..8fc7701f 100644
--- a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/ProcessingOptions.cs
+++ b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/ProcessingOptions.cs
@@ -13,7 +13,9 @@
* permissions and limitations under the License.
*/
+using System;
using System.Threading;
+using AWS.Lambda.Powertools.BatchProcessing.Exceptions;
namespace AWS.Lambda.Powertools.BatchProcessing;
@@ -28,17 +30,22 @@ public class ProcessingOptions
public CancellationToken? CancellationToken { get; init; }
///
- /// The maximum degree of parallelism to apply during batch processing.
+ /// The maximum degree of parallelism to apply during batch processing if is enabled (default is -1, which means ).
///
public int? MaxDegreeOfParallelism { get; init; }
///
- /// The error handling policy to apply during batch processing.
+ /// The error handling policy to apply during batch processing (default is ).
///
public BatchProcessorErrorHandlingPolicy? ErrorHandlingPolicy { get; init; }
///
- /// Batch processing enabled (default false)
+ /// Controls whether parallel batch processing is enabled (default false).
///
- public bool BatchParallelProcessingEnabled { get; init; }
+ public bool BatchParallelProcessingEnabled { get; init; } = false;
+
+ ///
+ /// Controls whether the Batch processor throws a on full batch failure (default true).
+ ///
+ public bool ThrowOnFullBatchFailure { get; init; } = true;
}
diff --git a/libraries/src/AWS.Lambda.Powertools.Common/Core/Constants.cs b/libraries/src/AWS.Lambda.Powertools.Common/Core/Constants.cs
index 1a15d30b..912196da 100644
--- a/libraries/src/AWS.Lambda.Powertools.Common/Core/Constants.cs
+++ b/libraries/src/AWS.Lambda.Powertools.Common/Core/Constants.cs
@@ -124,5 +124,10 @@ internal static class Constants
///
/// Constant for POWERTOOLS_BATCH_PARALLEL_ENABLED environment variable
///
- public const string BatchParallelProcessingEnabled = "POWERTOOLS_BATCH_PARALLEL_ENABLED";
+ internal const string BatchParallelProcessingEnabled = "POWERTOOLS_BATCH_PARALLEL_ENABLED";
+
+ ///
+ /// Constant for POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE environment variable
+ ///
+ internal const string BatchThrowOnFullBatchFailureEnv = "POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE";
}
\ No newline at end of file
diff --git a/libraries/src/AWS.Lambda.Powertools.Common/Core/IPowertoolsConfigurations.cs b/libraries/src/AWS.Lambda.Powertools.Common/Core/IPowertoolsConfigurations.cs
index 4d08c0fd..ff2c5664 100644
--- a/libraries/src/AWS.Lambda.Powertools.Common/Core/IPowertoolsConfigurations.cs
+++ b/libraries/src/AWS.Lambda.Powertools.Common/Core/IPowertoolsConfigurations.cs
@@ -155,7 +155,11 @@ public interface IPowertoolsConfigurations
/// Gets the maximum degree of parallelism to apply during batch processing.
///
/// Defaults to 1 (no parallelism). Specify -1 to automatically use the value of ProcessorCount.
- int BatchProcessingMaxDegreeOfParallelism { get; }
-
-
+ int BatchProcessingMaxDegreeOfParallelism { get; }
+
+ ///
+ /// Gets a value indicating whether Batch processing will throw an exception on full batch failure.
+ ///
+ /// Defaults to true
+ bool BatchThrowOnFullBatchFailureEnabled { get; }
}
\ No newline at end of file
diff --git a/libraries/src/AWS.Lambda.Powertools.Common/Core/PowertoolsConfigurations.cs b/libraries/src/AWS.Lambda.Powertools.Common/Core/PowertoolsConfigurations.cs
index 72a2a2da..cf316389 100644
--- a/libraries/src/AWS.Lambda.Powertools.Common/Core/PowertoolsConfigurations.cs
+++ b/libraries/src/AWS.Lambda.Powertools.Common/Core/PowertoolsConfigurations.cs
@@ -216,4 +216,7 @@ public void SetExecutionEnvironment(T type)
///
public int BatchProcessingMaxDegreeOfParallelism => GetEnvironmentVariableOrDefault(Constants.BatchMaxDegreeOfParallelismEnv, 1);
+
+ ///
+ public bool BatchThrowOnFullBatchFailureEnabled => GetEnvironmentVariableOrDefault(Constants.BatchThrowOnFullBatchFailureEnv, true);
}
\ No newline at end of file
diff --git a/libraries/src/AWS.Lambda.Powertools.Common/Utils/RuntimeFeatureWrapper.cs b/libraries/src/AWS.Lambda.Powertools.Common/Utils/RuntimeFeatureWrapper.cs
new file mode 100644
index 00000000..9c12eeef
--- /dev/null
+++ b/libraries/src/AWS.Lambda.Powertools.Common/Utils/RuntimeFeatureWrapper.cs
@@ -0,0 +1,29 @@
+using System;
+using System.Runtime.CompilerServices;
+
+namespace AWS.Lambda.Powertools.Common.Utils;
+
+///
+/// Wrapper for RuntimeFeature
+///
+public static class RuntimeFeatureWrapper
+{
+ private static Func _isDynamicCodeSupportedFunc = () => RuntimeFeature.IsDynamicCodeSupported;
+
+ ///
+ /// Check to see if IsDynamicCodeSupported
+ ///
+ public static bool IsDynamicCodeSupported => _isDynamicCodeSupportedFunc();
+
+// For testing purposes
+ internal static void SetIsDynamicCodeSupported(bool value)
+ {
+ _isDynamicCodeSupportedFunc = () => value;
+ }
+
+// To reset after tests
+ internal static void Reset()
+ {
+ _isDynamicCodeSupportedFunc = () => RuntimeFeature.IsDynamicCodeSupported;
+ }
+}
\ No newline at end of file
diff --git a/libraries/src/AWS.Lambda.Powertools.Logging/Serializers/PowertoolsLoggingSerializer.cs b/libraries/src/AWS.Lambda.Powertools.Logging/Serializers/PowertoolsLoggingSerializer.cs
index ef6085d9..54958829 100644
--- a/libraries/src/AWS.Lambda.Powertools.Logging/Serializers/PowertoolsLoggingSerializer.cs
+++ b/libraries/src/AWS.Lambda.Powertools.Logging/Serializers/PowertoolsLoggingSerializer.cs
@@ -23,6 +23,7 @@
using System.Text.Json.Serialization.Metadata;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.Common;
+using AWS.Lambda.Powertools.Common.Utils;
using AWS.Lambda.Powertools.Logging.Internal.Converters;
using Microsoft.Extensions.Logging;
@@ -69,6 +70,13 @@ internal static string Serialize(object value, Type inputType)
var options = GetSerializerOptions();
return JsonSerializer.Serialize(value, options);
#else
+ if (RuntimeFeatureWrapper.IsDynamicCodeSupported)
+ {
+ var options = GetSerializerOptions();
+#pragma warning disable
+ return JsonSerializer.Serialize(value, options);
+ }
+
var typeInfo = GetTypeInfo(inputType);
if (typeInfo == null)
{
@@ -154,10 +162,15 @@ private static JsonSerializerOptions BuildJsonSerializerOptions()
_jsonOptions.PropertyNameCaseInsensitive = true;
#if NET8_0_OR_GREATER
- _jsonOptions.TypeInfoResolverChain.Add(PowertoolsLoggingSerializationContext.Default);
- foreach (var context in AdditionalContexts)
+
+ // Only add TypeInfoResolver if AOT mode
+ if (!RuntimeFeatureWrapper.IsDynamicCodeSupported)
{
- _jsonOptions.TypeInfoResolverChain.Add(context);
+ _jsonOptions.TypeInfoResolverChain.Add(PowertoolsLoggingSerializationContext.Default);
+ foreach (var context in AdditionalContexts)
+ {
+ _jsonOptions.TypeInfoResolverChain.Add(context);
+ }
}
#endif
return _jsonOptions;
diff --git a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/Handler/HandlerFunction.cs b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/Handler/HandlerFunction.cs
index 3c965372..9bfbf90b 100644
--- a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/Handler/HandlerFunction.cs
+++ b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/Handler/HandlerFunction.cs
@@ -30,7 +30,6 @@ public class HandlerFunction
public HandlerFunction()
{
-
}
public HandlerFunction(ISqsBatchProcessor batchProcessor, ISqsRecordHandler recordHandler)
@@ -38,20 +37,19 @@ public HandlerFunction(ISqsBatchProcessor batchProcessor, ISqsRecordHandler reco
_batchProcessor = batchProcessor;
_recordHandler = recordHandler;
}
-
-
+
[BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))]
public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
+
[BatchProcessor(RecordHandler = typeof(CustomFailSqsRecordHandler))]
public BatchItemFailuresResponse HandlerUsingAttributeAllFail(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
+
[BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
public BatchItemFailuresResponse HandlerUsingAttributeErrorPolicy(SQSEvent _)
{
@@ -63,61 +61,60 @@ public Task HandlerUsingAttributeAsync(SQSEvent _)
{
return Task.FromResult(SqsBatchProcessor.Result.BatchItemFailuresResponse);
}
-
+
[BatchProcessor]
public BatchItemFailuresResponse HandlerUsingAttributeWithoutHandler(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
+
[BatchProcessor]
public BatchItemFailuresResponse HandlerUsingAttributeWithoutEvent(string _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
+
[BatchProcessor(RecordHandler = typeof(BadCustomSqsRecordHandler))]
public BatchItemFailuresResponse HandlerUsingAttributeBadHandler(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
+
[BatchProcessor(BatchProcessor = typeof(BadCustomSqsRecordProcessor))]
public BatchItemFailuresResponse HandlerUsingAttributeBadProcessor(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
+
[BatchProcessor(BatchProcessorProvider = typeof(BadCustomSqsRecordProcessor))]
public BatchItemFailuresResponse HandlerUsingAttributeBadProcessorProvider(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
+
[BatchProcessor(RecordHandlerProvider = typeof(BadCustomSqsRecordHandler))]
public BatchItemFailuresResponse HandlerUsingAttributeBadHandlerProvider(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
+
[BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessor = typeof(CustomSqsBatchProcessor))]
public BatchItemFailuresResponse HandlerUsingAttributeAndCustomBatchProcessor(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
+
[BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessorProvider = typeof(CustomSqsBatchProcessorProvider))]
public BatchItemFailuresResponse HandlerUsingAttributeAndCustomBatchProcessorProvider(SQSEvent _)
{
return SqsBatchProcessor.Result.BatchItemFailuresResponse;
}
-
+
public async Task HandlerUsingUtility(SQSEvent sqsEvent)
{
var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, RecordHandler.From(sqsMessage =>
{
var product = JsonSerializer.Deserialize(sqsMessage.Body);
-
if (product.GetProperty("Id").GetInt16() == 4)
{
throw new ArgumentException("Error on 4");
@@ -125,7 +122,7 @@ public async Task HandlerUsingUtility(SQSEvent sqsEve
}));
return result.BatchItemFailuresResponse;
}
-
+
public async Task HandlerUsingUtilityFromIoc(SQSEvent sqsEvent)
{
var batchProcessor = Services.Provider.GetRequiredService();
@@ -133,10 +130,70 @@ public async Task HandlerUsingUtilityFromIoc(SQSEvent
var result = await batchProcessor.ProcessAsync(sqsEvent, recordHandler);
return result.BatchItemFailuresResponse;
}
-
+
public async Task HandlerUsingUtilityFromIocConstructor(SQSEvent sqsEvent)
{
var result = await _batchProcessor.ProcessAsync(sqsEvent, _recordHandler);
return result.BatchItemFailuresResponse;
}
+
+ [BatchProcessor(RecordHandler = typeof(CustomFailSqsRecordHandler), ThrowOnFullBatchFailure = false)]
+ public BatchItemFailuresResponse HandlerUsingAttributeAllFail_ThrowOnFullBatchFailureFalseAttribute(SQSEvent _)
+ {
+ return SqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+
+ [BatchProcessor(RecordHandler = typeof(CustomFailSqsRecordHandler))]
+ public BatchItemFailuresResponse HandlerUsingAttributeAllFail_ThrowOnFullBatchFailureFalseEnv(SQSEvent _)
+ {
+ return SqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+
+ public async Task HandlerUsingUtilityAllFail_ThrowOnFullBatchFailureFalseOption(SQSEvent sqsEvent)
+ {
+ var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent,
+ RecordHandler.From(_ => throw new ArgumentException("Raise exception on all!")),
+ new ProcessingOptions
+ {
+ ThrowOnFullBatchFailure = false
+ });
+ return result.BatchItemFailuresResponse;
+ }
+
+ [BatchProcessor(
+ RecordHandler = typeof(CustomFailSqsRecordHandler),
+ ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure,
+ ThrowOnFullBatchFailure = false)]
+ public BatchItemFailuresResponse HandlerUsingAttributeFailAll_StopOnFirstErrorAttr_ThrowOnFullBatchFailureFalseAttr(SQSEvent _)
+ {
+ return SqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+
+ [BatchProcessor(
+ RecordHandler = typeof(CustomFailSqsRecordHandler),
+ ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
+ public BatchItemFailuresResponse HandlerUsingAttributeFailAll_StopOnFirstErrorAttr_ThrowOnFullBatchFailureFalseEnv(SQSEvent _)
+ {
+ return SqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+
+ public async Task HandlerUsingUtility_StopOnFirstErrorOption_ThrowOnFullBatchFailureFalseOption(SQSEvent sqsEvent)
+ {
+ var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent,
+ RecordHandler.From(async record =>
+ {
+ var product = JsonSerializer.Deserialize(record.Body);
+ if (product.GetProperty("Id").GetInt16() == 4)
+ {
+ throw new ArgumentException("Error on 4");
+ }
+ return await Task.FromResult(RecordHandlerResult.None);
+ }),
+ new ProcessingOptions
+ {
+ ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure,
+ ThrowOnFullBatchFailure = false
+ });
+ return result.BatchItemFailuresResponse;
+ }
}
\ No newline at end of file
diff --git a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/HandlerTests.cs b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/HandlerTests.cs
index c81d7f7b..04bd0b21 100644
--- a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/HandlerTests.cs
+++ b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/HandlerTests.cs
@@ -43,8 +43,8 @@ public Task Sqs_Handler_Using_Attribute()
Assert.Equal("4", response.BatchItemFailures[1].ItemIdentifier);
return Task.CompletedTask;
- }
-
+ }
+
[Fact]
public Task Sqs_Handler_All_Fail_Using_Attribute_Should_Throw_BatchProcessingException()
{
@@ -186,9 +186,152 @@ public Task Sqs_Handler_Using_Attribute_Error_Policy_Attribute_StopOnFirstBatchI
return Task.CompletedTask;
}
-
+
+ [Fact]
+ public Task Sqs_Handler_Using_Attribute_All_Fail_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Attribute()
+ {
+ // Arrange
+ var request = new SQSEvent
+ {
+ Records = TestHelper.SqsMessages
+ };
+ var function = new HandlerFunction();
+
+ // Act
+ var response = function.HandlerUsingAttributeAllFail_ThrowOnFullBatchFailureFalseAttribute(request);
+
+ // Assert
+ Assert.Equal(5, response.BatchItemFailures.Count);
+ Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier);
+ Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier);
+ Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier);
+ Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier);
+ Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier);
+
+ return Task.CompletedTask;
+ }
+
+ [Fact]
+ public Task Sqs_Handler_Using_Attribute_All_Fail_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Env()
+ {
+ // Arrange
+ Environment.SetEnvironmentVariable("POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE", "false");
+ var request = new SQSEvent
+ {
+ Records = TestHelper.SqsMessages
+ };
+ var function = new HandlerFunction();
+
+ // Act
+ var response = function.HandlerUsingAttributeAllFail_ThrowOnFullBatchFailureFalseEnv(request);
+
+ // Assert
+ Assert.Equal(5, response.BatchItemFailures.Count);
+ Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier);
+ Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier);
+ Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier);
+ Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier);
+ Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier);
+
+ return Task.CompletedTask;
+ }
+
+ [Fact]
+ public async Task Sqs_Handler_Using_Utility_All_Fail_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Option()
+ {
+ // Arrange
+ var request = new SQSEvent
+ {
+ Records = TestHelper.SqsMessages
+ };
+ var function = new HandlerFunction();
+
+ // Act
+ var response = await function.HandlerUsingUtilityAllFail_ThrowOnFullBatchFailureFalseOption(request);
+
+ // Assert
+ Assert.Equal(5, response.BatchItemFailures.Count);
+ Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier);
+ Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier);
+ Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier);
+ Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier);
+ Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier);
+ }
+
+ [Fact]
+ public Task Sqs_Fifo_Handler_Using_Attribute_All_Fail_With_Stop_On_First_Error_Attr_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Attribute()
+ {
+ // Arrange
+ var request = new SQSEvent
+ {
+ Records = TestHelper.SqsFifoMessagesWithFirstMessagePoisened
+ };
+ var function = new HandlerFunction();
+
+ // Act
+ var response = function.HandlerUsingAttributeFailAll_StopOnFirstErrorAttr_ThrowOnFullBatchFailureFalseAttr(request);
+
+ // Assert
+ Assert.Equal(5, response.BatchItemFailures.Count);
+ Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier);
+ Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier);
+ Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier);
+ Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier);
+ Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier);
+
+ return Task.CompletedTask;
+ }
+
+ [Fact]
+ public Task Sqs_Fifo_Handler_Using_Attribute_All_Fail_With_Stop_On_First_Error_Attr_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Env()
+ {
+ // Arrange
+ Environment.SetEnvironmentVariable("POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE", "false");
+ var request = new SQSEvent
+ {
+ Records = TestHelper.SqsFifoMessagesWithFirstMessagePoisened
+ };
+ var function = new HandlerFunction();
+
+ // Act
+ var response = function.HandlerUsingAttributeFailAll_StopOnFirstErrorAttr_ThrowOnFullBatchFailureFalseEnv(request);
+
+ // Assert
+ Assert.Equal(5, response.BatchItemFailures.Count);
+ Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier);
+ Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier);
+ Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier);
+ Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier);
+ Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier);
+
+ return Task.CompletedTask;
+ }
+
+ [Fact]
+ public async Task Sqs_Fifo_Handler_Using_Utility_All_Fail_With_Stop_On_First_Error_Attr_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Option()
+ {
+ // Arrange
+ var request = new SQSEvent
+ {
+ Records = TestHelper.SqsFifoMessagesWithFirstMessagePoisened
+ };
+ var function = new HandlerFunction();
+
+ // Act
+ var response = await function.HandlerUsingUtility_StopOnFirstErrorOption_ThrowOnFullBatchFailureFalseOption(request);
+
+ // Assert
+ Assert.Equal(5, response.BatchItemFailures.Count);
+ Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier);
+ Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier);
+ Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier);
+ Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier);
+ Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier);
+ }
+
public void Dispose()
{
+ Environment.SetEnvironmentVariable("POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE", "true");
Environment.SetEnvironmentVariable("POWERTOOLS_BATCH_PARALLEL_ENABLED", "false");
Environment.SetEnvironmentVariable("POWERTOOLS_BATCH_ERROR_HANDLING_POLICY", null);
}
diff --git a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Helpers/Helpers.cs b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Helpers/Helpers.cs
index 5b3cc8df..da090c51 100644
--- a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Helpers/Helpers.cs
+++ b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Helpers/Helpers.cs
@@ -32,7 +32,7 @@ internal static class Helpers
new SQSEvent.SQSMessage
{
MessageId = "1",
- Body = "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",
+ Body = "{\"Id\":1,\"Name\":\"product-1\",\"Price\":14}",
EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
},
new SQSEvent.SQSMessage
@@ -44,7 +44,7 @@ internal static class Helpers
new SQSEvent.SQSMessage
{
MessageId = "3",
- Body = "{\"Id\":3,\"Name\":\"product-4\",\"Price\":14}",
+ Body = "{\"Id\":3,\"Name\":\"product-3\",\"Price\":14}",
EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
},
new SQSEvent.SQSMessage
@@ -56,7 +56,7 @@ internal static class Helpers
new SQSEvent.SQSMessage
{
MessageId = "5",
- Body = "{\"Id\":5,\"Name\":\"product-4\",\"Price\":14}",
+ Body = "{\"Id\":5,\"Name\":\"product-5\",\"Price\":14}",
EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
},
};
@@ -66,7 +66,7 @@ internal static class Helpers
new SQSEvent.SQSMessage
{
MessageId = "1",
- Body = "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",
+ Body = "{\"Id\":1,\"Name\":\"product-1\",\"Price\":14}",
EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo"
},
new SQSEvent.SQSMessage
@@ -78,7 +78,7 @@ internal static class Helpers
new SQSEvent.SQSMessage
{
MessageId = "3",
- Body = "{\"Id\":3,\"Name\":\"product-4\",\"Price\":14}",
+ Body = "{\"Id\":3,\"Name\":\"product-3\",\"Price\":14}",
EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo"
},
new SQSEvent.SQSMessage
@@ -90,11 +90,45 @@ internal static class Helpers
new SQSEvent.SQSMessage
{
MessageId = "5",
- Body = "{\"Id\":5,\"Name\":\"product-4\",\"Price\":14}",
+ Body = "{\"Id\":5,\"Name\":\"product-5\",\"Price\":14}",
EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo"
},
};
+ internal static List SqsFifoMessagesWithFirstMessagePoisened =>
+ [
+ new SQSEvent.SQSMessage
+ {
+ MessageId = "1",
+ Body = "fail",
+ EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo"
+ },
+ new SQSEvent.SQSMessage
+ {
+ MessageId = "2",
+ Body = "{\"Id\":2,\"Name\":\"product-2\",\"Price\":14}",
+ EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo"
+ },
+ new SQSEvent.SQSMessage
+ {
+ MessageId = "3",
+ Body = "{\"Id\":3,\"Name\":\"product-3\",\"Price\":14}",
+ EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo"
+ },
+ new SQSEvent.SQSMessage
+ {
+ MessageId = "4",
+ Body = "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",
+ EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo"
+ },
+ new SQSEvent.SQSMessage
+ {
+ MessageId = "5",
+ Body = "{\"Id\":5,\"Name\":\"product-5\",\"Price\":14}",
+ EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo"
+ }
+ ];
+
internal static List DynamoDbMessages => new()
{
new DynamoDBEvent.DynamodbStreamRecord
diff --git a/libraries/tests/AWS.Lambda.Powertools.Logging.Tests/Serializers/PowertoolsLoggingSerializerTests.cs b/libraries/tests/AWS.Lambda.Powertools.Logging.Tests/Serializers/PowertoolsLoggingSerializerTests.cs
index e26053ac..8c539036 100644
--- a/libraries/tests/AWS.Lambda.Powertools.Logging.Tests/Serializers/PowertoolsLoggingSerializerTests.cs
+++ b/libraries/tests/AWS.Lambda.Powertools.Logging.Tests/Serializers/PowertoolsLoggingSerializerTests.cs
@@ -15,10 +15,12 @@
using System;
using System.Collections.Generic;
+using System.Runtime.CompilerServices;
using System.Text.Encodings.Web;
using System.Text.Json;
using System.Text.Json.Serialization;
using Amazon.Lambda.Serialization.SystemTextJson;
+using AWS.Lambda.Powertools.Common.Utils;
using AWS.Lambda.Powertools.Logging.Internal;
using AWS.Lambda.Powertools.Logging.Internal.Converters;
using AWS.Lambda.Powertools.Logging.Serializers;
@@ -48,8 +50,10 @@ public void SerializerOptions_ShouldNotBeNull()
[Fact]
public void SerializerOptions_ShouldHaveCorrectDefaultSettings()
{
- var options = PowertoolsLoggingSerializer.GetSerializerOptions();
-
+ RuntimeFeatureWrapper.SetIsDynamicCodeSupported(false);
+
+ var options = PowertoolsLoggingSerializer.GetSerializerOptions();
+
Assert.Collection(options.Converters,
converter => Assert.IsType(converter),
converter => Assert.IsType(converter),
@@ -70,6 +74,33 @@ public void SerializerOptions_ShouldHaveCorrectDefaultSettings()
resolver => Assert.IsType(resolver));
#endif
}
+
+ [Fact]
+ public void SerializerOptions_ShouldHaveCorrectDefaultSettings_WhenDynamic()
+ {
+ RuntimeFeatureWrapper.SetIsDynamicCodeSupported(true);
+
+ var options = PowertoolsLoggingSerializer.GetSerializerOptions();
+
+ Assert.Collection(options.Converters,
+ converter => Assert.IsType(converter),
+ converter => Assert.IsType(converter),
+ converter => Assert.IsType(converter),
+ converter => Assert.IsType(converter),
+ converter => Assert.IsType(converter),
+ converter => Assert.IsType(converter),
+#if NET8_0_OR_GREATER
+ converter => Assert.IsType>(converter));
+#elif NET6_0
+ converter => Assert.IsType(converter));
+#endif
+
+ Assert.Equal(JavaScriptEncoder.UnsafeRelaxedJsonEscaping, options.Encoder);
+
+#if NET8_0_OR_GREATER
+ Assert.Empty(options.TypeInfoResolverChain);
+#endif
+ }
[Fact]
public void SerializerOptions_ShouldUseSnakeCaseByDefault()
@@ -143,6 +174,7 @@ public void Serialize_UnknownType_ThrowsInvalidOperationException()
// Arrange
var unknownObject = new UnknownType();
+ RuntimeFeatureWrapper.SetIsDynamicCodeSupported(false);
// Act & Assert
var exception = Assert.Throws(() =>
PowertoolsLoggingSerializer.Serialize(unknownObject, typeof(UnknownType)));
@@ -150,6 +182,20 @@ public void Serialize_UnknownType_ThrowsInvalidOperationException()
Assert.Contains("is not known to the serializer", exception.Message);
Assert.Contains(typeof(UnknownType).ToString(), exception.Message);
}
+
+ [Fact]
+ public void Serialize_UnknownType_Should_Not_Throw_InvalidOperationException_When_Dynamic()
+ {
+ // Arrange
+ var unknownObject = new UnknownType{ SomeProperty = "Hello"};
+
+ RuntimeFeatureWrapper.SetIsDynamicCodeSupported(true);
+ // Act & Assert
+ var expected =
+ PowertoolsLoggingSerializer.Serialize(unknownObject, typeof(UnknownType));
+
+ Assert.Equal("{\"some_property\":\"Hello\"}", expected);
+ }
private class UnknownType
{
@@ -175,5 +221,6 @@ public void Dispose()
PowertoolsLoggingSerializer.ClearContext();
#endif
PowertoolsLoggingSerializer.ClearOptions();
+ RuntimeFeatureWrapper.Reset();
}
}
\ No newline at end of file
diff --git a/version.json b/version.json
index bad6b158..1a74e63c 100644
--- a/version.json
+++ b/version.json
@@ -1,12 +1,12 @@
{
"Core": {
- "Logging": "1.6.0",
+ "Logging": "1.6.1",
"Metrics": "1.7.1",
"Tracing": "1.5.1"
},
"Utilities": {
"Parameters": "1.3.0",
"Idempotency": "1.2.2",
- "BatchProcessing": "1.1.2"
+ "BatchProcessing": "1.2.0"
}
}