A Spring Boot starter to send and consume messages to / from an AWS SQS queue in a robust manner.
Note that this library has not been battle-tested in production, yet (at least not by me).
Include the dependency in your build file.
If your Spring Boot application uses @EnableAutoConfiguration
(which is the default) the SQS starter will be loaded into the Spring application context automatically.
To send messages to SQS, extend SqsMessagePublisher
:
class TestMessagePublisher extends SqsMessagePublisher<TestMessage> {
TestMessagePublisher(AmazonSQS sqsClient, ObjectMapper objectMapper) {
super("http://localhost:4576/queue/testMessages", sqsClient, objectMapper);
}
}
Send messages by calling the publish()
method:
TestMessagePublisher messagePublisher = new TestMessagePublisher(...);
messagePublisher.publish(new TestMessage("this is a test message"));
The publisher will automatically retry up to 3 times with a short exponential backoff if SQS returns an error. You can also pass your own RetryRegistry into the constructor to customize the retry behavior.
For consuming messages, the SQS starter lets you configure:
- a message poller, which polls messages from SQS at a regular interval, and
- a thread pool of message handlers, which process the messages received by the poller.
The starter assumes that all messages from an SQS queue are handled by the same message handler.
First, implement the SqsMessageHandler
interface:
class TestMessageHandler implements SqsMessageHandler<TestMessage> {
@Override
public void handle(TestMessage message) {
// handle the message
}
@Override
public Class<TestMessage> messageType() {
return TestMessage.class;
}
}
The message handler must be thread-safe! It will be called by multiple threads in parallel!
Then, register a bean of type SqsMessageHandlerRegistration
in the Spring application context:
@Component
class TestMessageHandlerRegistration implements SqsMessageHandlerRegistration<TestMessage> {
private final AmazonSQS client;
private final ObjectMapper objectMapper;
private final TestMessageHandler messageHandler;
public TestMessageHandlerRegistration(
AmazonSQS client,
ObjectMapper objectMapper,
TestMessageHandler messageHandler) {
this.client = client;
this.objectMapper = objectMapper;
this.messageHandler = messageHandler;
}
@Override
public SqsMessageHandler<TestMessage> messageHandler() {
return this.messageHandler;
}
@Override
public String name() {
return "testMessageHandler";
}
@Override
public SqsMessageHandlerProperties messageHandlerProperties() {
return new SqsMessageHandlerProperties();
}
@Override
public SqsMessagePollerProperties messagePollerProperties() {
return new SqsMessagePollerProperties("http://localhost:4576/queue/testMessages");
}
@Override
public AmazonSQS sqsClient() {
return this.client;
}
@Override
public ObjectMapper objectMapper() {
return this.objectMapper;
}
}
The SQS starter will set up a poller for each SqsMessageHandlerRegistration
bean it finds in the Spring application context.
You can configure the behavior of the poller (like the waiting intervall between polls) in SqsMessagePollerProperties
.
You can configure the message handler thread pool in SqsMessageHandlerProperties
.
If you're using JUnit Jupiter for testing, you can use the @SQSTest
annotation provided by the sqs-starter-test module to easily create local tests against an SQS queue:
@SqsTest(queueNames = "testMessages")
@SpringBootTest
class SendAndReceiveIntegrationTest {
@Autowired
private AmazonSQS sqsClient;
@Autowired
private TestMessageHandler messageHandler;
@Autowired
private TestMessagePublisher messagePublisher;
@Test
void sendAndReceive() {
messagePublisher.publish(new TestMessage("message 1"));
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(messageHandler.getCount()).isEqualTo(1));
}
}
The @SQSTest
annotation makes use of localstack to start up a Docker container with a mock SQS server and it will automatically create the specified queues for you.