From b85d49423a85165a35a382df1bb21c376198ed8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Forestier?= Date: Wed, 7 Aug 2024 16:40:08 +0200 Subject: [PATCH] Add filter policy argument to sns subscription It could be interesting that SQS queue can receive only specific messages from SNS topic. Add `filter_policy` and `filter_policy_scope` to the method `subscribe_to_sns_topic`. See AWS documentation for more information about the parameters. Ref it/org/operation_support/iaas/projects#94 --- src/e3/aws/troposphere/sqs/__init__.py | 17 +++++- .../tests_e3_aws/troposphere/sqs/sqs_test.py | 61 +++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/src/e3/aws/troposphere/sqs/__init__.py b/src/e3/aws/troposphere/sqs/__init__.py index 7780c77..849a9b2 100644 --- a/src/e3/aws/troposphere/sqs/__init__.py +++ b/src/e3/aws/troposphere/sqs/__init__.py @@ -77,13 +77,20 @@ def add_allow_service_to_write_statement( return self._get_queue_policy_name() def subscribe_to_sns_topic( - self, topic_arn: str, applicant: str, delivery_policy: dict | None = None + self, + topic_arn: str, + applicant: str, + delivery_policy: dict | None = None, + filter_policy: dict | None = None, + filter_policy_scope: str | None = None, ) -> None: """Subscribe to SNS topic. :param topic_arn: ARN of the topic to subscribe :param applicant: applicant name used for the Sid statement :param delivery_policy: The delivery policy to assign to the subscription + :param filter_policy: The filter policy JSON assigned to the subscription. + :param filter_policy_scope: The filtering scope. """ sub_params = { "Endpoint": self.arn, @@ -93,7 +100,13 @@ def subscribe_to_sns_topic( } if delivery_policy: - sub_params.update({"DeliveryPolicy": delivery_policy}) + sub_params["DeliveryPolicy"] = delivery_policy + + if filter_policy: + sub_params["FilterPolicy"] = filter_policy + + if filter_policy_scope: + sub_params["FilterPolicyScope"] = filter_policy_scope self.add_allow_service_to_write_statement( applicant=applicant, diff --git a/tests/tests_e3_aws/troposphere/sqs/sqs_test.py b/tests/tests_e3_aws/troposphere/sqs/sqs_test.py index 9fc1fa7..62565b8 100644 --- a/tests/tests_e3_aws/troposphere/sqs/sqs_test.py +++ b/tests/tests_e3_aws/troposphere/sqs/sqs_test.py @@ -65,6 +65,52 @@ }, } +EXPECTED_SQS_SUBSCRIPTION_WITH_FILTER_TEMPLATE = { + "Myqueue": { + "Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30}, + "Type": "AWS::SQS::Queue", + }, + "MyqueuePolicy": { + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Sid": "SomeApplicantWriteAccess", + "Action": "sqs:SendMessage", + "Condition": {"ArnLike": {"aws:SourceArn": "some_topic_arn"}}, + "Effect": "Allow", + "Principal": {"Service": "sns.amazonaws.com"}, + "Resource": {"Fn::GetAtt": ["Myqueue", "Arn"]}, + } + ], + "Version": "2012-10-17", + }, + "Queues": [{"Ref": "Myqueue"}], + }, + "Type": "AWS::SQS::QueuePolicy", + }, + "MyqueueSub": { + "Properties": { + "Endpoint": {"Fn::GetAtt": ["Myqueue", "Arn"]}, + "Protocol": "sqs", + "TopicArn": "some_topic_arn", + "RawMessageDelivery": True, + "FilterPolicy": { + "key_a": { + "key_b": { + "key_c": [ + "value_1", + "value_2", + ], + }, + }, + }, + "FilterPolicyScope": "MessageBody", + }, + "Type": "AWS::SNS::Subscription", + }, +} + def test_queue_default(stack: Stack) -> None: """Test Queue default creation.""" @@ -102,3 +148,18 @@ def test_allow_service_to_write_not_unique_sid(stack: Stack) -> None: stack.add(queue) assert str(ex.value) == "Unique Sid is required for QueuePolicy statements" + + +def test_subscribe_to_sns_topic_with_policy_filter(stack: Stack) -> None: + """Test sqs subscription to sns topic with a policy filter.""" + queue = Queue(name="myqueue") + queue.subscribe_to_sns_topic( + topic_arn="some_topic_arn", + applicant="SomeApplicant", + filter_policy={"key_a": {"key_b": {"key_c": ["value_1", "value_2"]}}}, + filter_policy_scope="MessageBody", + ) + + stack.add(queue) + + assert stack.export()["Resources"] == EXPECTED_SQS_SUBSCRIPTION_WITH_FILTER_TEMPLATE