Skip to content

Commit

Permalink
Merge pull request #251 from jeromef853/master
Browse files Browse the repository at this point in the history
Add a method to subscribe SQS Queue to SNS Topic
  • Loading branch information
pierretr authored Jun 20, 2024
2 parents 18b6e76 + 9b33369 commit 2f687cf
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/e3/aws/troposphere/sns/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ def add_lambda_subscription(
"Endpoint": function.arn,
"Protocol": "lambda",
"TopicArn": self.arn,
"DeliveryPolicy": delivery_policy,
}

if delivery_policy:
sub_params.update({"DeliveryPolicy": delivery_policy})

self.optional_resources.extend(
[
sns.SubscriptionResource(
Expand Down
40 changes: 38 additions & 2 deletions src/e3/aws/troposphere/sqs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from e3.aws.troposphere.iam.policy_document import PolicyDocument
from e3.aws.troposphere.iam.policy_statement import Allow

from troposphere import sqs, GetAtt, Ref
from troposphere import sns, sqs, GetAtt, Ref

if TYPE_CHECKING:
from typing import Optional
Expand All @@ -27,6 +27,10 @@ def __init__(
"""Initialize a SQS.
:param name: topic name
:param fifo: Set the queue type to fifo
:param visibility_timeout: set the length of time during which a message will be
unavailable after a message is delivered from the queue
:param dlq_name: dead letter queue name
"""
self.name = name
self.attr = {"QueueName": name, "VisibilityTimeout": visibility_timeout}
Expand All @@ -44,6 +48,7 @@ def __init__(
"deadLetterTargetArn": GetAtt(name_to_id(dlq_name), "Arn"),
"maxReceiveCount": "3",
}
self.optional_resources: list[AWSObject] = []

def allow_service_to_write(
self, service: str, name_suffix: str, condition: Optional[ConditionType] = None
Expand All @@ -64,6 +69,34 @@ def allow_service_to_write(
).as_dict,
)

def subscribe_to_sns_topic(
self, topic_arn: str, delivery_policy: dict | None = None
) -> None:
"""Subscribe to SNS topic.
:param topic_arn: ARN of the topic to subscribe
:param delivery_policy: The delivery policy to assign to the subscription
"""
sub_params = {
"Endpoint": self.arn,
"Protocol": "sqs",
"TopicArn": topic_arn,
}

if delivery_policy:
sub_params.update({"DeliveryPolicy": delivery_policy})

self.optional_resources.extend(
[
sns.SubscriptionResource(name_to_id(f"{self.name}Sub"), **sub_params),
self.allow_service_to_write(
service="sns",
name_suffix="Sub",
condition={"ArnLike": {"aws:SourceArn": topic_arn}},
),
]
)

@property
def arn(self) -> GetAtt:
"""SQS ARN."""
Expand All @@ -76,4 +109,7 @@ def ref(self) -> Ref:

def resources(self, stack: Stack) -> list[AWSObject]:
"""Compute AWS resources for the construct."""
return [sqs.Queue.from_dict(name_to_id(self.name), self.attr)]
return [
sqs.Queue.from_dict(name_to_id(self.name), self.attr),
*self.optional_resources,
]
Empty file.
88 changes: 88 additions & 0 deletions tests/tests_e3_aws/troposphere/sqs/sqs_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

from e3.aws.troposphere import Stack
from e3.aws.troposphere.sqs import Queue

EXPECTED_QUEUE_DEFAULT_TEMPLATE = {
"Myqueue": {
"Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30},
"Type": "AWS::SQS::Queue",
}
}


EXPECTED_QUEUE_TEMPLATE = {
"Myqueue": {
"Properties": {
"ContentBasedDeduplication": True,
"FifoQueue": True,
"QueueName": "myqueue.fifo",
"RedrivePolicy": {
"deadLetterTargetArn": {"Fn::GetAtt": ["Somedlqname", "Arn"]},
"maxReceiveCount": "3",
},
"VisibilityTimeout": 10,
},
"Type": "AWS::SQS::Queue",
}
}


EXPECTED_SQS_SUBSCRIPTION_TEMPLATE = {
"Myqueue": {
"Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30},
"Type": "AWS::SQS::Queue",
},
"MyqueuePolicySub": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Condition": {"ArnLike": {"aws:SourceArn": "some_topic_arn"}},
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Resource": {"Fn::GetAtt": ["Myqueue", "Arn"]},
}
],
"Version": "2012-10-17",
},
"Queues": [{"Ref": "Myqueue"}],
},
"Type": "AWS::SQS::QueuePolicy",
},
"MyqueueSub": {
"Properties": {
"Endpoint": {"Fn::GetAtt": ["Myqueue", "Arn"]},
"Protocol": "sqs",
"TopicArn": "some_topic_arn",
},
"Type": "AWS::SNS::Subscription",
},
}


def test_queue_default(stack: Stack) -> None:
"""Test Queue default creation."""
stack.add(Queue(name="myqueue"))
assert stack.export()["Resources"] == EXPECTED_QUEUE_DEFAULT_TEMPLATE


def test_queue(stack: Stack) -> None:
"""Test Queue creation."""
stack.add(
Queue(
name="myqueue", fifo=True, visibility_timeout=10, dlq_name="some_dlq_name"
)
)
assert stack.export()["Resources"] == EXPECTED_QUEUE_TEMPLATE


def test_subscribe_to_sns_topic(stack: Stack) -> None:
"""Test sqs subscription to sns topic."""
queue = Queue(name="myqueue")
queue.subscribe_to_sns_topic("some_topic_arn")

stack.add(queue)

assert stack.export()["Resources"] == EXPECTED_SQS_SUBSCRIPTION_TEMPLATE

0 comments on commit 2f687cf

Please sign in to comment.