From f8a73909aa1f4be42705e34a73cf9127466c8ba0 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 14 Mar 2024 10:31:22 +0000 Subject: [PATCH 01/13] Adding a flag to SqsFifoProcessor to allow message processing to continue --- .../batch/sqs_fifo_partial_processor.py | 26 +++++-- docs/utilities/batch.md | 8 +- tests/functional/test_utilities_batch.py | 75 +++++++++++++++++-- 3 files changed, 93 insertions(+), 16 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index d48749a137e..e7c24be488b 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -57,7 +57,8 @@ def lambda_handler(event, context: LambdaContext): None, ) - def __init__(self, model: Optional["BatchSqsTypeModel"] = None): + def __init__(self, model: Optional["BatchSqsTypeModel"] = None, return_on_first_error: bool = True): + self.return_on_first_error = return_on_first_error super().__init__(EventType.SQS, model) def process(self) -> List[Tuple]: @@ -68,13 +69,26 @@ def process(self) -> List[Tuple]: result: List[Tuple] = [] for i, record in enumerate(self.records): - # If we have failed messages, it means that the last message failed. - # We then short circuit the process, failing the remaining messages - if self.fail_messages: + # If we have failed messages and we are set to return on the first error, + # short circuit the process and return the remaining messages as failed items + if self.fail_messages and self.return_on_first_error: return self._short_circuit_processing(i, result) - # Otherwise, process the message normally - result.append(self._process_record(record)) + # Process the current record + processed_messages = self._process_record(record) + + # If a processed message fail, + # mark subsequent messages from the same MessageGroupId as skipped + if processed_messages[0] == "fail": + for subsequent_record in self.records[i + 1 :]: + if subsequent_record.get("attributes", {}).get("MessageGroupId") == record.get( + "attributes", + {}, + ).get("MessageGroupId"): + continue # Skip subsequent message from the same MessageGroupId + + # Append the processed message normally + result.append(processed_messages) return result diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index ada05766ab4..ce75e94efc8 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -141,8 +141,12 @@ Processing batches from SQS works in three stages: #### FIFO queues -When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank" rel="nofollow"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. -This helps preserve the ordering of messages in your queue. +When working with [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, it's important to know that a batch sent from SQS to Lambda can include multiple messages from different group IDs. + +By default, message processing halts after the initial failure, returning all failed and unprocessed messages in `batchItemFailures` to preserve the ordering of messages in your queue. However, customers can opt to continue processing messages and retrieve failed messages within a message group ID by setting `return_on_first_error` to False. + +???+ notice "Having problems with DLQ?" + `AsyncBatchProcessor` uses `asyncio.gather`. This might cause [side effects and reach trace limits at high concurrency](../core/tracer.md#concurrent-asynchronous-functions){target="_blank"}. === "Recommended" diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index e146d65744f..00cbace7d2c 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -38,6 +38,32 @@ from tests.functional.utils import b64_to_str, str_to_b64 +@pytest.fixture(scope="module") +def sqs_event_fifo_factory() -> Callable: + def factory(body: str, message_group_id: str = ""): + return { + "messageId": f"{uuid.uuid4()}", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": body, + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1703675223472", + "SequenceNumber": "18882884930918384133", + "MessageGroupId": message_group_id, + "SenderId": "SenderId", + "MessageDeduplicationId": "1eea03c3f7e782c7bdc2f2a917f40389314733ff39f5ab16219580c0109ade98", + "ApproximateFirstReceiveTimestamp": "1703675223484", + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-1", + } + + return factory + + @pytest.fixture(scope="module") def sqs_event_factory() -> Callable: def factory(body: str): @@ -48,7 +74,7 @@ def factory(body: str): "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", - "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "SenderId": "SenderId", "ApproximateFirstReceiveTimestamp": "1545082649185", }, "messageAttributes": {}, @@ -660,10 +686,10 @@ def lambda_handler(event, context): assert "All records failed processing. " in str(e.value) -def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_factory, record_handler): +def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_fifo_factory, record_handler): # GIVEN - first_record = SQSRecord(sqs_event_factory("success")) - second_record = SQSRecord(sqs_event_factory("success")) + first_record = SQSRecord(sqs_event_fifo_factory("success")) + second_record = SQSRecord(sqs_event_fifo_factory("success")) event = {"Records": [first_record.raw_event, second_record.raw_event]} processor = SqsFifoPartialProcessor() @@ -679,12 +705,12 @@ def lambda_handler(event, context): assert result["batchItemFailures"] == [] -def test_sqs_fifo_batch_processor_middleware_with_failure(sqs_event_factory, record_handler): +def test_sqs_fifo_batch_processor_middleware_with_failure(sqs_event_fifo_factory, record_handler): # GIVEN - first_record = SQSRecord(sqs_event_factory("success")) - second_record = SQSRecord(sqs_event_factory("fail")) + first_record = SQSRecord(sqs_event_fifo_factory("success")) + second_record = SQSRecord(sqs_event_fifo_factory("fail")) # this would normally succeed, but since it's a FIFO queue, it will be marked as failure - third_record = SQSRecord(sqs_event_factory("success")) + third_record = SQSRecord(sqs_event_fifo_factory("success")) event = {"Records": [first_record.raw_event, second_record.raw_event, third_record.raw_event]} processor = SqsFifoPartialProcessor() @@ -702,6 +728,39 @@ def lambda_handler(event, context): assert result["batchItemFailures"][1]["itemIdentifier"] == third_record.message_id +def test_sqs_fifo_batch_processor_middleware_without_first_failure(sqs_event_fifo_factory, record_handler): + # GIVEN a batch of 5 records with 3 different MessageGroupID + first_record = SQSRecord(sqs_event_fifo_factory("success", "1")) + second_record = SQSRecord(sqs_event_fifo_factory("success", "1")) + third_record = SQSRecord(sqs_event_fifo_factory("fail", "2")) + fourth_record = SQSRecord(sqs_event_fifo_factory("fail", "2")) + fifth_record = SQSRecord(sqs_event_fifo_factory("success", "3")) + event = { + "Records": [ + first_record.raw_event, + second_record.raw_event, + third_record.raw_event, + fourth_record.raw_event, + fifth_record.raw_event, + ], + } + + # WHEN the FIFO processor is set to continue processing even after encountering errors in specific MessageGroupID + processor = SqsFifoPartialProcessor(return_on_first_error=False) + + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + + # WHEN + result = lambda_handler(event, {}) + + # THEN only failed messages should originate from MessageGroupID 2 + assert len(result["batchItemFailures"]) == 2 + assert result["batchItemFailures"][0]["itemIdentifier"] == third_record.message_id + assert result["batchItemFailures"][1]["itemIdentifier"] == fourth_record.message_id + + def test_async_batch_processor_middleware_success_only(sqs_event_factory, async_record_handler): # GIVEN first_record = SQSRecord(sqs_event_factory("success")) From de5d7566e7962ec23508b2c0b78975cc37eb6f07 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 14 Mar 2024 11:06:46 +0000 Subject: [PATCH 02/13] Adding docstring --- .../utilities/batch/sqs_fifo_partial_processor.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index e7c24be488b..6d460a16b91 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -58,6 +58,17 @@ def lambda_handler(event, context: LambdaContext): ) def __init__(self, model: Optional["BatchSqsTypeModel"] = None, return_on_first_error: bool = True): + """ + Initialize the SqsFifoProcessor. + + Parameters + ---------- + model: Optional["BatchSqsTypeModel"] + An optional model for batch processing. + return_on_first_error: bool + Flag to determine whether to return on the first error encountered. Default is True + + """ self.return_on_first_error = return_on_first_error super().__init__(EventType.SQS, model) From 1ad32f519d366f2b5fec67c119ad1226a137b03e Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 14 Mar 2024 18:36:41 +0000 Subject: [PATCH 03/13] Refactoring logic to skip execution when messages are part of a groupid with failed messages --- .../batch/sqs_fifo_partial_processor.py | 46 ++++++++++----- docs/utilities/batch.md | 4 +- tests/functional/test_utilities_batch.py | 56 +++++++++++++++++-- 3 files changed, 86 insertions(+), 20 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index 6d460a16b91..cf7634d3ac9 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -1,8 +1,11 @@ +import logging from typing import List, Optional, Tuple from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel +logger = logging.getLogger(__name__) + class SQSFifoCircuitBreakerError(Exception): """ @@ -57,7 +60,7 @@ def lambda_handler(event, context: LambdaContext): None, ) - def __init__(self, model: Optional["BatchSqsTypeModel"] = None, return_on_first_error: bool = True): + def __init__(self, model: Optional["BatchSqsTypeModel"] = None, skip_group_on_error: bool = False): """ Initialize the SqsFifoProcessor. @@ -65,11 +68,12 @@ def __init__(self, model: Optional["BatchSqsTypeModel"] = None, return_on_first_ ---------- model: Optional["BatchSqsTypeModel"] An optional model for batch processing. - return_on_first_error: bool - Flag to determine whether to return on the first error encountered. Default is True + skip_group_on_error: bool + # TODO: Alterar + Determine whether to return on the first error encountered. Default is True """ - self.return_on_first_error = return_on_first_error + self._skip_group_on_error = skip_group_on_error super().__init__(EventType.SQS, model) def process(self) -> List[Tuple]: @@ -78,25 +82,41 @@ def process(self) -> List[Tuple]: the process is short-circuited, and the remaining messages are reported as failed items. """ result: List[Tuple] = [] + skip_messages_group_id: List = [] for i, record in enumerate(self.records): # If we have failed messages and we are set to return on the first error, # short circuit the process and return the remaining messages as failed items - if self.fail_messages and self.return_on_first_error: + if self.fail_messages and not self._skip_group_on_error: + logger.debug("Processing of failed messages stopped due to the 'skip_group_on_error' is set to False") return self._short_circuit_processing(i, result) - # Process the current record + msg_id = record.get("messageId") + + # skip_group_on_error is True: + # Skip processing the current message if its ID belongs to a group with failed messages + if msg_id in skip_messages_group_id: + logger.debug( + f"Skipping message with ID '{msg_id}' as it is part of a group containing failed messages.", + ) + continue + processed_messages = self._process_record(record) - # If a processed message fail, + # If a processed message fail and skip_group_on_error is True, # mark subsequent messages from the same MessageGroupId as skipped - if processed_messages[0] == "fail": + if processed_messages[0] == "fail" and self._skip_group_on_error: + _attributes_record = record.get("attributes", {}) for subsequent_record in self.records[i + 1 :]: - if subsequent_record.get("attributes", {}).get("MessageGroupId") == record.get( - "attributes", - {}, - ).get("MessageGroupId"): - continue # Skip subsequent message from the same MessageGroupId + _attributes = subsequent_record.get("attributes", {}) + if _attributes.get("MessageGroupId") == _attributes_record.get("MessageGroupId"): + skip_messages_group_id.append(subsequent_record.get("messageId")) + data = self._to_batch_type( + record=subsequent_record, + event_type=self.event_type, + model=self.model, + ) + result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc)) # Append the processed message normally result.append(processed_messages) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index ce75e94efc8..7b0aaf5ea93 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -141,7 +141,9 @@ Processing batches from SQS works in three stages: #### FIFO queues -When working with [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, it's important to know that a batch sent from SQS to Lambda can include multiple messages from different group IDs. +When working with [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, you should know that a batch may include messages from different group IDs. + +By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. By default, message processing halts after the initial failure, returning all failed and unprocessed messages in `batchItemFailures` to preserve the ordering of messages in your queue. However, customers can opt to continue processing messages and retrieve failed messages within a message group ID by setting `return_on_first_error` to False. diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 00cbace7d2c..fcb613fbdfb 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -28,6 +28,7 @@ from aws_lambda_powertools.utilities.parser.models import ( DynamoDBStreamChangedRecordModel, DynamoDBStreamRecordModel, + SqsRecordModel, ) from aws_lambda_powertools.utilities.parser.types import Literal from tests.functional.batch.sample_models import ( @@ -728,13 +729,13 @@ def lambda_handler(event, context): assert result["batchItemFailures"][1]["itemIdentifier"] == third_record.message_id -def test_sqs_fifo_batch_processor_middleware_without_first_failure(sqs_event_fifo_factory, record_handler): +def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error(sqs_event_fifo_factory, record_handler): # GIVEN a batch of 5 records with 3 different MessageGroupID first_record = SQSRecord(sqs_event_fifo_factory("success", "1")) second_record = SQSRecord(sqs_event_fifo_factory("success", "1")) third_record = SQSRecord(sqs_event_fifo_factory("fail", "2")) - fourth_record = SQSRecord(sqs_event_fifo_factory("fail", "2")) - fifth_record = SQSRecord(sqs_event_fifo_factory("success", "3")) + fourth_record = SQSRecord(sqs_event_fifo_factory("success", "2")) + fifth_record = SQSRecord(sqs_event_fifo_factory("fail", "3")) event = { "Records": [ first_record.raw_event, @@ -746,7 +747,7 @@ def test_sqs_fifo_batch_processor_middleware_without_first_failure(sqs_event_fif } # WHEN the FIFO processor is set to continue processing even after encountering errors in specific MessageGroupID - processor = SqsFifoPartialProcessor(return_on_first_error=False) + processor = SqsFifoPartialProcessor(skip_group_on_error=True) @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context): @@ -755,10 +756,53 @@ def lambda_handler(event, context): # WHEN result = lambda_handler(event, {}) - # THEN only failed messages should originate from MessageGroupID 2 - assert len(result["batchItemFailures"]) == 2 + # THEN only failed messages should originate from MessageGroupID 3 + assert len(result["batchItemFailures"]) == 3 + assert result["batchItemFailures"][0]["itemIdentifier"] == third_record.message_id + assert result["batchItemFailures"][1]["itemIdentifier"] == fourth_record.message_id + assert result["batchItemFailures"][2]["itemIdentifier"] == fifth_record.message_id + + +def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error_and_model(sqs_event_fifo_factory, record_handler): + # GIVEN a batch of 5 records with 3 different MessageGroupID + first_record = SQSRecord(sqs_event_fifo_factory("success", "1")) + second_record = SQSRecord(sqs_event_fifo_factory("success", "1")) + third_record = SQSRecord(sqs_event_fifo_factory("fail", "2")) + fourth_record = SQSRecord(sqs_event_fifo_factory("success", "2")) + fifth_record = SQSRecord(sqs_event_fifo_factory("fail", "3")) + event = { + "Records": [ + first_record.raw_event, + second_record.raw_event, + third_record.raw_event, + fourth_record.raw_event, + fifth_record.raw_event, + ], + } + + class OrderSqsRecord(SqsRecordModel): + receiptHandle: str + + # WHEN the FIFO processor is set to continue processing even after encountering errors in specific MessageGroupID + # WHEN processor is using a Pydantic Model we must be able to access MessageGroupID property + processor = SqsFifoPartialProcessor(skip_group_on_error=True, model=OrderSqsRecord) + + def record_handler(record: OrderSqsRecord): + if record.body == "fail": + raise ValueError("blah") + + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + + # WHEN + result = lambda_handler(event, {}) + + # THEN only failed messages should originate from MessageGroupID 3 + assert len(result["batchItemFailures"]) == 3 assert result["batchItemFailures"][0]["itemIdentifier"] == third_record.message_id assert result["batchItemFailures"][1]["itemIdentifier"] == fourth_record.message_id + assert result["batchItemFailures"][2]["itemIdentifier"] == fifth_record.message_id def test_async_batch_processor_middleware_success_only(sqs_event_factory, async_record_handler): From 8084a961061524dc2ff139ca84043314748df6f8 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 14 Mar 2024 18:44:46 +0000 Subject: [PATCH 04/13] Reducing complexity --- .../batch/sqs_fifo_partial_processor.py | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index cf7634d3ac9..fa0e6eaf88b 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -1,5 +1,5 @@ import logging -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel @@ -106,23 +106,36 @@ def process(self) -> List[Tuple]: # If a processed message fail and skip_group_on_error is True, # mark subsequent messages from the same MessageGroupId as skipped if processed_messages[0] == "fail" and self._skip_group_on_error: - _attributes_record = record.get("attributes", {}) - for subsequent_record in self.records[i + 1 :]: - _attributes = subsequent_record.get("attributes", {}) - if _attributes.get("MessageGroupId") == _attributes_record.get("MessageGroupId"): - skip_messages_group_id.append(subsequent_record.get("messageId")) - data = self._to_batch_type( - record=subsequent_record, - event_type=self.event_type, - model=self.model, - ) - result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc)) + self._process_failed_subsequent_messages(record, i, skip_messages_group_id, result) # Append the processed message normally result.append(processed_messages) return result + def _process_failed_subsequent_messages( + self, + record: Dict, + i: int, + skip_messages_group_id: List, + result: List[Tuple], + ) -> None: + """ + Process failed subsequent messages from the same MessageGroupId and mark them as skipped. + """ + _attributes_record = record.get("attributes", {}) + + for subsequent_record in self.records[i + 1 :]: + _attributes = subsequent_record.get("attributes", {}) + if _attributes.get("MessageGroupId") == _attributes_record.get("MessageGroupId"): + skip_messages_group_id.append(subsequent_record.get("messageId")) + data = self._to_batch_type( + record=subsequent_record, + event_type=self.event_type, + model=self.model, + ) + result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc)) + def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]: """ Starting from the first failure index, fail all the remaining messages, and append them to the result list. From 1b13512d42a86e7f1b56ae930cc2d64ec8b3d44a Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 14 Mar 2024 22:43:27 +0000 Subject: [PATCH 05/13] Adding documentation --- docs/utilities/batch.md | 40 ++++++++++++++++--- .../getting_started_sqs_fifo_skip_on_error.py | 23 +++++++++++ 2 files changed, 57 insertions(+), 6 deletions(-) create mode 100644 examples/batch_processing/src/getting_started_sqs_fifo_skip_on_error.py diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 7b0aaf5ea93..a5e66d00d1c 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -143,12 +143,9 @@ Processing batches from SQS works in three stages: When working with [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, you should know that a batch may include messages from different group IDs. -By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. +By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID. -By default, message processing halts after the initial failure, returning all failed and unprocessed messages in `batchItemFailures` to preserve the ordering of messages in your queue. However, customers can opt to continue processing messages and retrieve failed messages within a message group ID by setting `return_on_first_error` to False. - -???+ notice "Having problems with DLQ?" - `AsyncBatchProcessor` uses `asyncio.gather`. This might cause [side effects and reach trace limits at high concurrency](../core/tracer.md#concurrent-asynchronous-functions){target="_blank"}. +Enable the `skip_group_on_error` option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID. === "Recommended" @@ -170,6 +167,12 @@ By default, message processing halts after the initial failure, returning all fa --8<-- "examples/batch_processing/src/getting_started_sqs_fifo_decorator.py" ``` +=== "Enabling skip_group_on_error flag" + + ```python hl_lines="2-6 9 23" + --8<-- "examples/batch_processing/src/getting_started_sqs_fifo_skip_on_error.py" + ``` + ### Processing messages from Kinesis Processing batches from Kinesis works in three stages: @@ -317,7 +320,7 @@ sequenceDiagram > Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}. -Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues. +Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues without `skip_group_on_error` flag.
```mermaid @@ -341,6 +344,31 @@ sequenceDiagram SQS FIFO mechanism with Batch Item Failures
+Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues with `skip_group_on_error` flag. + +
+```mermaid +sequenceDiagram + autonumber + participant SQS queue + participant Lambda service + participant Lambda function + Lambda service->>SQS queue: Poll + Lambda service->>Lambda function: Invoke (batch event) + activate Lambda function + Lambda function-->Lambda function: Process 2 out of 10 batch items + Lambda function--xLambda function: Fail on 3rd batch item + Lambda function-->Lambda function: Process messages from another MessageGroupID + Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure + deactivate Lambda function + activate SQS queue + Lambda service->>SQS queue: Delete successful messages processed + SQS queue-->>SQS queue: Failed messages return + deactivate SQS queue +``` +SQS FIFO mechanism with Batch Item Failures +
+ #### Kinesis and DynamoDB Streams > Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}. diff --git a/examples/batch_processing/src/getting_started_sqs_fifo_skip_on_error.py b/examples/batch_processing/src/getting_started_sqs_fifo_skip_on_error.py new file mode 100644 index 00000000000..83015483d1f --- /dev/null +++ b/examples/batch_processing/src/getting_started_sqs_fifo_skip_on_error.py @@ -0,0 +1,23 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + SqsFifoPartialProcessor, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = SqsFifoPartialProcessor(skip_group_on_error=True) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.json_body # if json string data, otherwise record.body for str + logger.info(payload) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) From 6ad583b2cd5c63480a5948970ea2d08beae18409 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Mon, 18 Mar 2024 09:37:14 +0000 Subject: [PATCH 06/13] Addressing Mathieu's feedback --- .../batch/sqs_fifo_partial_processor.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index fa0e6eaf88b..77ad6be4918 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -69,8 +69,8 @@ def __init__(self, model: Optional["BatchSqsTypeModel"] = None, skip_group_on_er model: Optional["BatchSqsTypeModel"] An optional model for batch processing. skip_group_on_error: bool - # TODO: Alterar - Determine whether to return on the first error encountered. Default is True + Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures + Default is False """ self._skip_group_on_error = skip_group_on_error @@ -82,7 +82,7 @@ def process(self) -> List[Tuple]: the process is short-circuited, and the remaining messages are reported as failed items. """ result: List[Tuple] = [] - skip_messages_group_id: List = [] + skip_message_ids: List = [] for i, record in enumerate(self.records): # If we have failed messages and we are set to return on the first error, @@ -95,21 +95,21 @@ def process(self) -> List[Tuple]: # skip_group_on_error is True: # Skip processing the current message if its ID belongs to a group with failed messages - if msg_id in skip_messages_group_id: + if msg_id in skip_message_ids: logger.debug( f"Skipping message with ID '{msg_id}' as it is part of a group containing failed messages.", ) continue - processed_messages = self._process_record(record) + processed_message = self._process_record(record) # If a processed message fail and skip_group_on_error is True, # mark subsequent messages from the same MessageGroupId as skipped - if processed_messages[0] == "fail" and self._skip_group_on_error: - self._process_failed_subsequent_messages(record, i, skip_messages_group_id, result) + if processed_message[0] == "fail" and self._skip_group_on_error: + self._process_failed_subsequent_messages(record, i, skip_message_ids, result) # Append the processed message normally - result.append(processed_messages) + result.append(processed_message) return result From a43bbdbc9b54a92adf6d330ddc30abbbc20af073 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Mon, 18 Mar 2024 10:14:13 +0000 Subject: [PATCH 07/13] Addressing Mathieu's feedback --- .../utilities/batch/sqs_fifo_partial_processor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index 77ad6be4918..4d6dde6ec60 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -78,8 +78,13 @@ def __init__(self, model: Optional["BatchSqsTypeModel"] = None, skip_group_on_er def process(self) -> List[Tuple]: """ - Call instance's handler for each record. When the first failed message is detected, - the process is short-circuited, and the remaining messages are reported as failed items. + Call instance's handler for each record. + + If skip_group_on_error is set to False, the process short-circuits upon detecting the first failed message, + and the remaining messages are reported as failed items. + + If skip_group_on_error is set to True, upon encountering the first failed message for a specific MessageGroupID, + all messages from that MessageGroupID are skipped and reported as failed items. """ result: List[Tuple] = [] skip_message_ids: List = [] From 28d68af3f0f7f05c1af84ee9acbbbf8e7f1d8358 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 19 Mar 2024 09:36:27 +0000 Subject: [PATCH 08/13] Addressing Ruben's feedback --- .../utilities/batch/sqs_fifo_partial_processor.py | 2 +- docs/utilities/batch.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index 4d6dde6ec60..e75f241ce69 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -93,7 +93,7 @@ def process(self) -> List[Tuple]: # If we have failed messages and we are set to return on the first error, # short circuit the process and return the remaining messages as failed items if self.fail_messages and not self._skip_group_on_error: - logger.debug("Processing of failed messages stopped due to the 'skip_group_on_error' is set to False") + logger.debug("Processing of failed messages stopped because 'skip_group_on_error' is False") return self._short_circuit_processing(i, result) msg_id = record.get("messageId") diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index a5e66d00d1c..e5241d516e8 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -141,7 +141,7 @@ Processing batches from SQS works in three stages: #### FIFO queues -When working with [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, you should know that a batch may include messages from different group IDs. +When working with [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, a batch may include messages from different group IDs. By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID. From 1ca372c786abb3d799f6d5c02339ba5316af26bc Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Tue, 19 Mar 2024 14:38:10 +0100 Subject: [PATCH 09/13] chore: refactor --- .../batch/sqs_fifo_partial_processor.py | 129 +++++++----------- 1 file changed, 51 insertions(+), 78 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index e75f241ce69..504f5ea11fb 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -1,7 +1,7 @@ import logging -from typing import Dict, List, Optional, Tuple +from typing import Optional, Set, override -from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, ExceptionInfo, FailureResponse from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel logger = logging.getLogger(__name__) @@ -15,6 +15,14 @@ class SQSFifoCircuitBreakerError(Exception): pass +class SQSFifoMessageGroupCircuitBreakerError(Exception): + """ + Signals a record not processed due to the SQS FIFO message group processing being interrupted + """ + + pass + + class SqsFifoPartialProcessor(BatchProcessor): """Process native partial responses from SQS FIFO queues. @@ -60,6 +68,12 @@ def lambda_handler(event, context: LambdaContext): None, ) + group_circuit_breaker_exc = ( + SQSFifoMessageGroupCircuitBreakerError, + SQSFifoMessageGroupCircuitBreakerError("A previous record from this message group failed processing"), + None, + ) + def __init__(self, model: Optional["BatchSqsTypeModel"] = None, skip_group_on_error: bool = False): """ Initialize the SqsFifoProcessor. @@ -70,86 +84,45 @@ def __init__(self, model: Optional["BatchSqsTypeModel"] = None, skip_group_on_er An optional model for batch processing. skip_group_on_error: bool Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures - Default is False + Default is False. """ - self._skip_group_on_error = skip_group_on_error + self._skip_group_on_error: bool = skip_group_on_error + self._current_group_id = None + self._failed_group_ids: Set[str] = set() super().__init__(EventType.SQS, model) - def process(self) -> List[Tuple]: - """ - Call instance's handler for each record. - - If skip_group_on_error is set to False, the process short-circuits upon detecting the first failed message, - and the remaining messages are reported as failed items. - - If skip_group_on_error is set to True, upon encountering the first failed message for a specific MessageGroupID, - all messages from that MessageGroupID are skipped and reported as failed items. - """ - result: List[Tuple] = [] - skip_message_ids: List = [] - - for i, record in enumerate(self.records): - # If we have failed messages and we are set to return on the first error, - # short circuit the process and return the remaining messages as failed items - if self.fail_messages and not self._skip_group_on_error: - logger.debug("Processing of failed messages stopped because 'skip_group_on_error' is False") - return self._short_circuit_processing(i, result) - - msg_id = record.get("messageId") - - # skip_group_on_error is True: - # Skip processing the current message if its ID belongs to a group with failed messages - if msg_id in skip_message_ids: - logger.debug( - f"Skipping message with ID '{msg_id}' as it is part of a group containing failed messages.", - ) - continue - - processed_message = self._process_record(record) - - # If a processed message fail and skip_group_on_error is True, - # mark subsequent messages from the same MessageGroupId as skipped - if processed_message[0] == "fail" and self._skip_group_on_error: - self._process_failed_subsequent_messages(record, i, skip_message_ids, result) - - # Append the processed message normally - result.append(processed_message) - - return result - - def _process_failed_subsequent_messages( - self, - record: Dict, - i: int, - skip_messages_group_id: List, - result: List[Tuple], - ) -> None: - """ - Process failed subsequent messages from the same MessageGroupId and mark them as skipped. - """ - _attributes_record = record.get("attributes", {}) - - for subsequent_record in self.records[i + 1 :]: - _attributes = subsequent_record.get("attributes", {}) - if _attributes.get("MessageGroupId") == _attributes_record.get("MessageGroupId"): - skip_messages_group_id.append(subsequent_record.get("messageId")) - data = self._to_batch_type( - record=subsequent_record, - event_type=self.event_type, - model=self.model, - ) - result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc)) - - def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]: - """ - Starting from the first failure index, fail all the remaining messages, and append them to the result list. - """ - remaining_records = self.records[first_failure_index:] - for remaining_record in remaining_records: - data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model) - result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc)) - return result + @override + def _process_record(self, record): + self._current_group_id = record.get("attributes", {}).get("MessageGroupId") + + # Short-circuits the process if: + # - There are failed messages, OR + # - The `skip_group_on_error` option is on, and the current message is part of a failed group. + fail_group_id = self._skip_group_on_error and self._current_group_id in self._failed_group_ids + if self.fail_messages or fail_group_id: + return self.failure_handler( + record=self._to_batch_type(record, event_type=self.event_type, model=self.model), + exception=self.group_circuit_breaker_exc if self._skip_group_on_error else self.circuit_breaker_exc, + ) + + return super()._process_record(record) + + @override + def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse: + # If we are failing a message and the `skip_group_on_error` is on, we store the failed group ID + # This way, future messages with the same group ID will be failed automatically. + if self._skip_group_on_error and self._current_group_id: + self._failed_group_ids.add(self._current_group_id) + + return super().failure_handler(record, exception) + + @override + def _clean(self): + self._failed_group_ids.clear() + self._current_group_id = None + + super()._clean() async def _async_process_record(self, record: dict): raise NotImplementedError() From b3fdd573b5b5f27b0c191089b253003698596dd9 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Tue, 19 Mar 2024 14:40:47 +0100 Subject: [PATCH 10/13] chore: refactor --- .../utilities/batch/sqs_fifo_partial_processor.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index 504f5ea11fb..f541a7be930 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -1,5 +1,5 @@ import logging -from typing import Optional, Set, override +from typing import Optional, Set from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, ExceptionInfo, FailureResponse from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel @@ -92,7 +92,6 @@ def __init__(self, model: Optional["BatchSqsTypeModel"] = None, skip_group_on_er self._failed_group_ids: Set[str] = set() super().__init__(EventType.SQS, model) - @override def _process_record(self, record): self._current_group_id = record.get("attributes", {}).get("MessageGroupId") @@ -108,7 +107,6 @@ def _process_record(self, record): return super()._process_record(record) - @override def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse: # If we are failing a message and the `skip_group_on_error` is on, we store the failed group ID # This way, future messages with the same group ID will be failed automatically. @@ -117,7 +115,6 @@ def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse: return super().failure_handler(record, exception) - @override def _clean(self): self._failed_group_ids.clear() self._current_group_id = None From 8c4df8e3ad78848c2eb2db0a1b42a35a8b4e8ec7 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Wed, 20 Mar 2024 08:32:10 +0000 Subject: [PATCH 11/13] Adding temp test to help Ruben test it --- tests/functional/test_utilities_batch.py | 37 ++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index fcb613fbdfb..a22e6e852a4 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -763,6 +763,43 @@ def lambda_handler(event, context): assert result["batchItemFailures"][2]["itemIdentifier"] == fifth_record.message_id +def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error_first_message_fail( + sqs_event_fifo_factory, + record_handler, +): + # GIVEN a batch of 5 records with 3 different MessageGroupID + first_record = SQSRecord(sqs_event_fifo_factory("fail", "1")) + second_record = SQSRecord(sqs_event_fifo_factory("success", "1")) + third_record = SQSRecord(sqs_event_fifo_factory("fail", "2")) + fourth_record = SQSRecord(sqs_event_fifo_factory("success", "2")) + fifth_record = SQSRecord(sqs_event_fifo_factory("fail", "3")) + event = { + "Records": [ + first_record.raw_event, + second_record.raw_event, + third_record.raw_event, + fourth_record.raw_event, + fifth_record.raw_event, + ], + } + + # WHEN the FIFO processor is set to continue processing even after encountering errors in specific MessageGroupID + processor = SqsFifoPartialProcessor(skip_group_on_error=True) + + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + + # WHEN + result = lambda_handler(event, {}) + + # THEN only failed messages should originate from MessageGroupID 3 + assert len(result["batchItemFailures"]) == 3 + assert result["batchItemFailures"][0]["itemIdentifier"] == third_record.message_id + assert result["batchItemFailures"][1]["itemIdentifier"] == fourth_record.message_id + assert result["batchItemFailures"][2]["itemIdentifier"] == fifth_record.message_id + + def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error_and_model(sqs_event_fifo_factory, record_handler): # GIVEN a batch of 5 records with 3 different MessageGroupID first_record = SQSRecord(sqs_event_fifo_factory("success", "1")) From f8e8872e59665d010604bdac32fa354527cf7a39 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Wed, 20 Mar 2024 10:10:22 +0100 Subject: [PATCH 12/13] fix: condition --- .../utilities/batch/sqs_fifo_partial_processor.py | 3 ++- tests/functional/test_utilities_batch.py | 15 ++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index f541a7be930..7fc985d861d 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -98,8 +98,9 @@ def _process_record(self, record): # Short-circuits the process if: # - There are failed messages, OR # - The `skip_group_on_error` option is on, and the current message is part of a failed group. + fail_entire_batch = bool(self.fail_messages) and not self._skip_group_on_error fail_group_id = self._skip_group_on_error and self._current_group_id in self._failed_group_ids - if self.fail_messages or fail_group_id: + if fail_entire_batch or fail_group_id: return self.failure_handler( record=self._to_batch_type(record, event_type=self.event_type, model=self.model), exception=self.group_circuit_breaker_exc if self._skip_group_on_error else self.circuit_breaker_exc, diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index a22e6e852a4..8ea2fac7bc5 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -772,7 +772,7 @@ def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error_first_mess second_record = SQSRecord(sqs_event_fifo_factory("success", "1")) third_record = SQSRecord(sqs_event_fifo_factory("fail", "2")) fourth_record = SQSRecord(sqs_event_fifo_factory("success", "2")) - fifth_record = SQSRecord(sqs_event_fifo_factory("fail", "3")) + fifth_record = SQSRecord(sqs_event_fifo_factory("success", "3")) event = { "Records": [ first_record.raw_event, @@ -790,14 +790,15 @@ def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error_first_mess def lambda_handler(event, context): return processor.response() - # WHEN + # WHEN the handler is onvoked result = lambda_handler(event, {}) - # THEN only failed messages should originate from MessageGroupID 3 - assert len(result["batchItemFailures"]) == 3 - assert result["batchItemFailures"][0]["itemIdentifier"] == third_record.message_id - assert result["batchItemFailures"][1]["itemIdentifier"] == fourth_record.message_id - assert result["batchItemFailures"][2]["itemIdentifier"] == fifth_record.message_id + # THEN messages from group 1 and 2 should fail, but not group 3 + assert len(result["batchItemFailures"]) == 4 + assert result["batchItemFailures"][0]["itemIdentifier"] == first_record.message_id + assert result["batchItemFailures"][1]["itemIdentifier"] == second_record.message_id + assert result["batchItemFailures"][2]["itemIdentifier"] == third_record.message_id + assert result["batchItemFailures"][3]["itemIdentifier"] == fourth_record.message_id def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error_and_model(sqs_event_fifo_factory, record_handler): From 90db3e0b7da28eca36efe3af2a5d590205d0b89c Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Wed, 20 Mar 2024 10:41:09 +0100 Subject: [PATCH 13/13] chore: moved exceptions --- .../utilities/batch/exceptions.py | 16 +++++++++++++++ .../batch/sqs_fifo_partial_processor.py | 20 ++++--------------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/exceptions.py b/aws_lambda_powertools/utilities/batch/exceptions.py index a3eefbb9cea..3f4075c7d2f 100644 --- a/aws_lambda_powertools/utilities/batch/exceptions.py +++ b/aws_lambda_powertools/utilities/batch/exceptions.py @@ -36,3 +36,19 @@ def __init__(self, msg="", child_exceptions: List[ExceptionInfo] | None = None): def __str__(self): parent_exception_str = super(BatchProcessingError, self).__str__() return self.format_exceptions(parent_exception_str) + + +class SQSFifoCircuitBreakerError(Exception): + """ + Signals a record not processed due to the SQS FIFO processing being interrupted + """ + + pass + + +class SQSFifoMessageGroupCircuitBreakerError(Exception): + """ + Signals a record not processed due to the SQS FIFO message group processing being interrupted + """ + + pass diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index 7fc985d861d..e54389718bc 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -2,27 +2,15 @@ from typing import Optional, Set from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, ExceptionInfo, FailureResponse +from aws_lambda_powertools.utilities.batch.exceptions import ( + SQSFifoCircuitBreakerError, + SQSFifoMessageGroupCircuitBreakerError, +) from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel logger = logging.getLogger(__name__) -class SQSFifoCircuitBreakerError(Exception): - """ - Signals a record not processed due to the SQS FIFO processing being interrupted - """ - - pass - - -class SQSFifoMessageGroupCircuitBreakerError(Exception): - """ - Signals a record not processed due to the SQS FIFO message group processing being interrupted - """ - - pass - - class SqsFifoPartialProcessor(BatchProcessor): """Process native partial responses from SQS FIFO queues.