Skip to content

Commit

Permalink
Add filter policy argument to sns subscription
Browse files Browse the repository at this point in the history
It could be interesting that SQS queue can receive only
specific messages from SNS topic.
Add `filter_policy` and `filter_policy_scope` to the
method `subscribe_to_sns_topic`. See AWS documentation
for more information about the parameters.

Ref it/org/operation_support/iaas/projects#94
  • Loading branch information
jeromef853 committed Aug 14, 2024
1 parent f162626 commit b85d494
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
17 changes: 15 additions & 2 deletions src/e3/aws/troposphere/sqs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,20 @@ def add_allow_service_to_write_statement(
return self._get_queue_policy_name()

def subscribe_to_sns_topic(
self, topic_arn: str, applicant: str, delivery_policy: dict | None = None
self,
topic_arn: str,
applicant: str,
delivery_policy: dict | None = None,
filter_policy: dict | None = None,
filter_policy_scope: str | None = None,
) -> None:
"""Subscribe to SNS topic.
:param topic_arn: ARN of the topic to subscribe
:param applicant: applicant name used for the Sid statement
:param delivery_policy: The delivery policy to assign to the subscription
:param filter_policy: The filter policy JSON assigned to the subscription.
:param filter_policy_scope: The filtering scope.
"""
sub_params = {
"Endpoint": self.arn,
Expand All @@ -93,7 +100,13 @@ def subscribe_to_sns_topic(
}

if delivery_policy:
sub_params.update({"DeliveryPolicy": delivery_policy})
sub_params["DeliveryPolicy"] = delivery_policy

if filter_policy:
sub_params["FilterPolicy"] = filter_policy

if filter_policy_scope:
sub_params["FilterPolicyScope"] = filter_policy_scope

self.add_allow_service_to_write_statement(
applicant=applicant,
Expand Down
61 changes: 61 additions & 0 deletions tests/tests_e3_aws/troposphere/sqs/sqs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,52 @@
},
}

EXPECTED_SQS_SUBSCRIPTION_WITH_FILTER_TEMPLATE = {
"Myqueue": {
"Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30},
"Type": "AWS::SQS::Queue",
},
"MyqueuePolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Sid": "SomeApplicantWriteAccess",
"Action": "sqs:SendMessage",
"Condition": {"ArnLike": {"aws:SourceArn": "some_topic_arn"}},
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Resource": {"Fn::GetAtt": ["Myqueue", "Arn"]},
}
],
"Version": "2012-10-17",
},
"Queues": [{"Ref": "Myqueue"}],
},
"Type": "AWS::SQS::QueuePolicy",
},
"MyqueueSub": {
"Properties": {
"Endpoint": {"Fn::GetAtt": ["Myqueue", "Arn"]},
"Protocol": "sqs",
"TopicArn": "some_topic_arn",
"RawMessageDelivery": True,
"FilterPolicy": {
"key_a": {
"key_b": {
"key_c": [
"value_1",
"value_2",
],
},
},
},
"FilterPolicyScope": "MessageBody",
},
"Type": "AWS::SNS::Subscription",
},
}


def test_queue_default(stack: Stack) -> None:
"""Test Queue default creation."""
Expand Down Expand Up @@ -102,3 +148,18 @@ def test_allow_service_to_write_not_unique_sid(stack: Stack) -> None:
stack.add(queue)

assert str(ex.value) == "Unique Sid is required for QueuePolicy statements"


def test_subscribe_to_sns_topic_with_policy_filter(stack: Stack) -> None:
"""Test sqs subscription to sns topic with a policy filter."""
queue = Queue(name="myqueue")
queue.subscribe_to_sns_topic(
topic_arn="some_topic_arn",
applicant="SomeApplicant",
filter_policy={"key_a": {"key_b": {"key_c": ["value_1", "value_2"]}}},
filter_policy_scope="MessageBody",
)

stack.add(queue)

assert stack.export()["Resources"] == EXPECTED_SQS_SUBSCRIPTION_WITH_FILTER_TEMPLATE

0 comments on commit b85d494

Please sign in to comment.