diff --git a/src/e3/aws/troposphere/sqs/__init__.py b/src/e3/aws/troposphere/sqs/__init__.py index 7780c77..849a9b2 100644 --- a/src/e3/aws/troposphere/sqs/__init__.py +++ b/src/e3/aws/troposphere/sqs/__init__.py @@ -77,13 +77,20 @@ def add_allow_service_to_write_statement( return self._get_queue_policy_name() def subscribe_to_sns_topic( - self, topic_arn: str, applicant: str, delivery_policy: dict | None = None + self, + topic_arn: str, + applicant: str, + delivery_policy: dict | None = None, + filter_policy: dict | None = None, + filter_policy_scope: str | None = None, ) -> None: """Subscribe to SNS topic. :param topic_arn: ARN of the topic to subscribe :param applicant: applicant name used for the Sid statement :param delivery_policy: The delivery policy to assign to the subscription + :param filter_policy: The filter policy JSON assigned to the subscription. + :param filter_policy_scope: The filtering scope. """ sub_params = { "Endpoint": self.arn, @@ -93,7 +100,13 @@ def subscribe_to_sns_topic( } if delivery_policy: - sub_params.update({"DeliveryPolicy": delivery_policy}) + sub_params["DeliveryPolicy"] = delivery_policy + + if filter_policy: + sub_params["FilterPolicy"] = filter_policy + + if filter_policy_scope: + sub_params["FilterPolicyScope"] = filter_policy_scope self.add_allow_service_to_write_statement( applicant=applicant, diff --git a/tests/tests_e3_aws/troposphere/sqs/sqs_test.py b/tests/tests_e3_aws/troposphere/sqs/sqs_test.py index 9fc1fa7..62565b8 100644 --- a/tests/tests_e3_aws/troposphere/sqs/sqs_test.py +++ b/tests/tests_e3_aws/troposphere/sqs/sqs_test.py @@ -65,6 +65,52 @@ }, } +EXPECTED_SQS_SUBSCRIPTION_WITH_FILTER_TEMPLATE = { + "Myqueue": { + "Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30}, + "Type": "AWS::SQS::Queue", + }, + "MyqueuePolicy": { + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Sid": "SomeApplicantWriteAccess", + "Action": "sqs:SendMessage", + "Condition": {"ArnLike": {"aws:SourceArn": "some_topic_arn"}}, + "Effect": "Allow", + "Principal": {"Service": "sns.amazonaws.com"}, + "Resource": {"Fn::GetAtt": ["Myqueue", "Arn"]}, + } + ], + "Version": "2012-10-17", + }, + "Queues": [{"Ref": "Myqueue"}], + }, + "Type": "AWS::SQS::QueuePolicy", + }, + "MyqueueSub": { + "Properties": { + "Endpoint": {"Fn::GetAtt": ["Myqueue", "Arn"]}, + "Protocol": "sqs", + "TopicArn": "some_topic_arn", + "RawMessageDelivery": True, + "FilterPolicy": { + "key_a": { + "key_b": { + "key_c": [ + "value_1", + "value_2", + ], + }, + }, + }, + "FilterPolicyScope": "MessageBody", + }, + "Type": "AWS::SNS::Subscription", + }, +} + def test_queue_default(stack: Stack) -> None: """Test Queue default creation.""" @@ -102,3 +148,18 @@ def test_allow_service_to_write_not_unique_sid(stack: Stack) -> None: stack.add(queue) assert str(ex.value) == "Unique Sid is required for QueuePolicy statements" + + +def test_subscribe_to_sns_topic_with_policy_filter(stack: Stack) -> None: + """Test sqs subscription to sns topic with a policy filter.""" + queue = Queue(name="myqueue") + queue.subscribe_to_sns_topic( + topic_arn="some_topic_arn", + applicant="SomeApplicant", + filter_policy={"key_a": {"key_b": {"key_c": ["value_1", "value_2"]}}}, + filter_policy_scope="MessageBody", + ) + + stack.add(queue) + + assert stack.export()["Resources"] == EXPECTED_SQS_SUBSCRIPTION_WITH_FILTER_TEMPLATE