Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Batch Processing Utility #337

Merged
merged 50 commits into from
Sep 19, 2023

Conversation

lachriz-aws
Copy link
Contributor

@lachriz-aws lachriz-aws commented Jun 30, 2023

Issue number: #168

Summary

Changes

This PR adds the Batch Processing Utility to Powertools for AWS Lambda (.NET).

With this, we can bring support for simple utilization of the AWS Lambda function response type ReportBatchItemFailures where partial batch item failures can be reported, and thereby help reducing the number of items that are being re-processed. The utility will automatically monitor the processing of each item within a batch, and report which items failed to be processed. This enable developers to focus on writing business logic while benefiting from the Batch Processing Utility automatically doing the reporting of partial failures within a batch.

User experience

Add the nuget package:

dotnet add package AWS.Lambda.Powertools.BatchProcessing

Consuming batches from SQS
Function handler using attribute-based configuration:

[BatchProcesser(RecordHandler = typeof(CustomSqsRecordHandler))]
public BatchItemFailuresResponse SqsHandlerUsingAttribute(SQSEvent _)
{
    return SqsBatchProcessor.Instance.ProcessingResult.BatchItemFailuresResponse;
}

Implementation of CustomSqsRecordHandler:

public class CustomSqsRecordHandler : IRecordHandler<SQSEvent.SQSMessage>
{
    public async Task<RecordHandlerResult> HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)
    {
        /*
         * Your business logic.
         * If an exception is thrown, the record will be marked as a partial batch item failure.
         */
        return await Task.FromResult(RecordHandlerResult.None);
    }
}

With this, the framework automatically creates an instance of a batch processor and uses the provided record handler for the per-record processing logic. All logic around catching exceptions and keeping track of partial batch item failures is handled automatically by the framework. On top of that, there are FIFO specific logic around handling partial failures - from the docs:

If you're using this feature with a FIFO queue, your function should stop processing messages after the first failure and return all failed and unprocessed messages in batchItemFailures. This helps preserve the ordering of messages in your queue.

... and this is also handled automatically by the framework.

Consuming batches from DynamoDB Streams

[BatchProcesser(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))]
public BatchItemFailuresResponse DynamoDbStreamHandlerUsingAttribute(DynamoDBEvent _)
{
    return DynamoDbStreamBatchProcessor.Instance.ProcessingResult.BatchItemFailuresResponse;
}

Consuming batches from Kinesis Data Streams

[BatchProcesser(RecordHandler = typeof(CustomKinesisDataStreamRecordHandler))]
public BatchItemFailuresResponse KinesisDataStreamHandlerUsingAttribute(KinesisEvent _)
{
    return KinesisDataStreamBatchProcessor.Instance.ProcessingResult.BatchItemFailuresResponse;
}

Consuming batches using the utility

public async Task<BatchItemFailuresResponse> HandlerUsingUtility(SQSEvent sqsEvent)
{
    var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, RecordHandler<SQSEvent.SQSMessage>.From(x =>
    {
        Logger.LogInformation($"Inline handling of SQS message with body: '{x.Body}'.");
    }));
    return result.BatchItemFailuresResponse;
}

Consuming batches using processor and handler from IoC

public async Task<BatchItemFailuresResponse> HandlerUsingUtilityFromIoc(SQSEvent sqsEvent)
{
    var batchProcessor = Services.Provider.GetRequiredService<CustomSqsBatchProcessor>();
    var recordHandler = Services.Provider.GetRequiredService<CustomSqsRecordHandler>();
    var result = await batchProcessor.ProcessAsync(sqsEvent, recordHandler);
    return result.BatchItemFailuresResponse;
}

Configuration
The Batch Processing Utility supports the following configuration:

  • MaxDegreeOfParallelism This is used to control the parallelism of the batch item processing. With a value of 1, the processing is done sequentially (default). Sequential processing is recommended when preserving order is important - i.e. with SQS FIFIO queues. With a value > 1, the processing is done in parallel. Doing parallel processing can enable processing to complete faster, i.e., when processing does downstream service calls. With a value of -1, the parallelism is automatically configured to be the vCPU count of the Lambda function. Internally, the Batch Processing Utility utilizes Parallel.ForEachAsync Method and the ParallelOptions.MaxDegreeOfParallelism Property to enable this functionality.

  • ErrorHandlingPolicy This is used to control the error handling policy of the batch item processing. With a value of DeriveFromEvent (default), the specific BatchProcessor, determines the policy based on the incoming event. For example, the SqsBatchProcessor looks at the EventSourceArn to determine if the ErrorHandlingPolicy should be StopOnFirstBatchItemFailure (for FIFO queues) or ContinueOnBatchItemFailure (for standard queues). For StopOnFirstBatchItemFailure the batch processor stops processing and marks any remaining records as batch item failures. For ContinueOnBatchItemFailure the batch processor continues processing batch items regardless of item failures.

The configuration items can be set in different ways:

  • Using POWERTOOLS_ environment variables.
  • Using the BatchProcesser attribute.
  • Using the ProcessingOptions object passed to BatchProcessor.ProcessAsync().

Checklist

Please leave checklist items unchecked if they do not apply to your change.

Is this a breaking change? No

RFC issue number: #168

Checklist:

  • Migration process documented
  • Implement warnings (if it can live side by side)

Acknowledgment

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.

@boring-cyborg boring-cyborg bot added the internal Maintenance changes label Jun 30, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 30, 2023

Thanks a lot for your first contribution! Please check out our contributing guidelines and don't hesitate to ask whatever you need.
In the meantime, check out the #python channel on our Powertools for AWS Lambda Discord: Invite link

@pull-request-size pull-request-size bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jun 30, 2023
@lachriz-aws
Copy link
Contributor Author

I'm still finishing off a few things, but wanted to get the draft PR going. My goal is to get the feature ready for review during the weekend.

@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Jul 2, 2023
…ing results. We are now down to zero warnings.
@lachriz-aws
Copy link
Contributor Author

lachriz-aws commented Jul 2, 2023

The PR is now more or less ready to be reviewed :) Will sync with @hjgraca before submitting.

@hjgraca hjgraca added area/batch Batch utility do-not-merge PRs that are blocked for varying reasons labels Jul 3, 2023
@hjgraca hjgraca added this to the Batch GA Release milestone Sep 7, 2023
Copy link
Contributor

@amirkaws amirkaws left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have committed all changes into the PR.

@hjgraca
Copy link
Contributor

hjgraca commented Sep 15, 2023

Thanks for reviewing @amirkaws

@sonarqubecloud
Copy link

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 38 Code Smells

No Coverage information No Coverage information
0.0% 0.0% Duplication

@hjgraca hjgraca merged commit bcc0ff1 into aws-powertools:develop Sep 19, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 19, 2023

Awesome work, congrats on your first merged pull request and thank you for helping improve everyone's experience!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/batch Batch utility do-not-merge PRs that are blocked for varying reasons documentation Improvements or additions to documentation feature New features or minor changes internal Maintenance changes size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

RFC: Add Batch Processing Utility
4 participants