Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SnsMessageConverter and NotificationMessageArgumentResolver to allow reading SNS messages on SQS #898

Merged
11 changes: 11 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,17 @@ Any number of `@SqsListener` annotations can be used in a bean class, and each a

NOTE: Queues declared in the same annotation will share the container, though each will have separate throughput and acknowledgement controls.

===== SNS Messages

Since 3.1.1, when receiving SNS messages through the `@SqsListener`, the message includes all attributes of the `SnsNotification`. To only receive need the `Message` part of the payload, you can utilize the `@SnsNotificationMessage` annotation.
[source, java]
----
@SqsListener("my-queue")
public void listen(@SnsNotificationMessage Pojo pojo) {
System.out.println(pojo.field);
}
----

===== Specifying a MessageListenerContainerFactory
A `MessageListenerContainerFactory` can be specified through the `factory` property.
Such factory will then be used to create the container for the annotated method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,17 @@ protected void configureDefaultHandlerMethodFactory(DefaultMessageHandlerMethodF
CompositeMessageConverter compositeMessageConverter = createCompositeMessageConverter();

List<HandlerMethodArgumentResolver> methodArgumentResolvers = new ArrayList<>(
createAdditionalArgumentResolvers());
createAdditionalArgumentResolvers(compositeMessageConverter, this.endpointRegistrar.getObjectMapper()));
methodArgumentResolvers.addAll(createArgumentResolvers(compositeMessageConverter));
this.endpointRegistrar.getMethodArgumentResolversConsumer().accept(methodArgumentResolvers);
handlerMethodFactory.setArgumentResolvers(methodArgumentResolvers);
handlerMethodFactory.afterPropertiesSet();
}

protected Collection<HandlerMethodArgumentResolver> createAdditionalArgumentResolvers(
tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved
MessageConverter messageConverter, ObjectMapper objectMapper) {
return createAdditionalArgumentResolvers();
}
protected Collection<HandlerMethodArgumentResolver> createAdditionalArgumentResolvers() {
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2013-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation that is used to map SNS notification value on an SQS Queue to a variable that is annotated. Used in
* Controllers method for handling/receiving SQS notifications.
*
* @author Michael Sosa
* @since 3.1.1
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface SnsNotificationMessage {

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
*/
package io.awspring.cloud.sqs.annotation;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.sqs.config.Endpoint;
import io.awspring.cloud.sqs.config.SqsBeanNames;
import io.awspring.cloud.sqs.config.SqsEndpoint;
import io.awspring.cloud.sqs.listener.SqsHeaders;
import io.awspring.cloud.sqs.support.resolver.NotificationMessageArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.QueueAttributesMethodArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.SqsMessageMethodArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.VisibilityHandlerMethodArgumentResolver;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;

/**
Expand Down Expand Up @@ -68,7 +73,17 @@ protected String getMessageListenerContainerRegistryBeanName() {
@Override
protected Collection<HandlerMethodArgumentResolver> createAdditionalArgumentResolvers() {
return Arrays.asList(new VisibilityHandlerMethodArgumentResolver(SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER),
new SqsMessageMethodArgumentResolver(), new QueueAttributesMethodArgumentResolver());
new SqsMessageMethodArgumentResolver(), new QueueAttributesMethodArgumentResolver());
}

@Override
protected Collection<HandlerMethodArgumentResolver> createAdditionalArgumentResolvers(
MessageConverter messageConverter, ObjectMapper objectMapper) {
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>(createAdditionalArgumentResolvers());
if (objectMapper != null) {
argumentResolvers.add(new NotificationMessageArgumentResolver(messageConverter, objectMapper));
}
return argumentResolvers;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.support.converter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;

/**
* @author Michael Sosa
* @author gustavomonarin
* @since 3.1.1
*/
public class SnsMessageConverter implements SmartMessageConverter {

private final ObjectMapper jsonMapper;

private final MessageConverter payloadConverter;

public SnsMessageConverter(MessageConverter payloadConverter, ObjectMapper jsonMapper) {
Assert.notNull(payloadConverter, "payloadConverter must not be null");
Assert.notNull(jsonMapper, "jsonMapper must not be null");
this.payloadConverter = payloadConverter;
msosa marked this conversation as resolved.
Show resolved Hide resolved
this.jsonMapper = jsonMapper;
}

@Override
public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
Assert.notNull(message, "message must not be null");
Assert.notNull(targetClass, "target class must not be null");

JsonNode jsonNode;
try {
jsonNode = this.jsonMapper.readTree(message.getPayload().toString());
}
catch (Exception e) {
throw new MessageConversionException("Could not read JSON", e);
}
if (!jsonNode.has("Type")) {
throw new MessageConversionException(
"Payload: '" + message.getPayload() + "' does not contain a Type attribute", null);
}

if (!"Notification".equals(jsonNode.get("Type").asText())) {
throw new MessageConversionException("Payload: '" + message.getPayload() + "' is not a valid notification",
null);
}

if (!jsonNode.has("Message")) {
throw new MessageConversionException("Payload: '" + message.getPayload() + "' does not contain a message",
null);
}

String messagePayload = jsonNode.get("Message").asText();
GenericMessage<String> genericMessage = new GenericMessage<>(messagePayload);
Object convertedMessage = (payloadConverter instanceof SmartMessageConverter) ?
((SmartMessageConverter)this.payloadConverter).fromMessage(genericMessage, targetClass, conversionHint)
: this.payloadConverter.fromMessage(genericMessage, targetClass);
return new SnsMessageWrapper(jsonNode.path("Subject").asText(), convertedMessage);
}

@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return fromMessage( message, targetClass, null);
}

@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
throw new UnsupportedOperationException(
"This converter only supports reading a SNS notification and not writing them");
}

@Override
public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {
throw new UnsupportedOperationException(
"This converter only supports reading a SNS notification and not writing them");
}

/**
* SNS Message wrapper.
*/
public record SnsMessageWrapper(String subject, Object message) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.support.resolver;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.sqs.annotation.SnsNotificationMessage;
import io.awspring.cloud.sqs.support.converter.SnsMessageConverter;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.util.Assert;

/**
* @author Michael Sosa
* @author gustavomonarin
* @since 3.1.1
*/
public class NotificationMessageArgumentResolver implements HandlerMethodArgumentResolver {

private final SmartMessageConverter converter;

public NotificationMessageArgumentResolver(MessageConverter converter, ObjectMapper jsonMapper) {
this.converter = new SnsMessageConverter(converter, jsonMapper);
}

@Override
public boolean supportsParameter(MethodParameter parameter) {
return parameter.hasParameterAnnotation(SnsNotificationMessage.class);
}

@Override
public Object resolveArgument(MethodParameter par, Message<?> msg) {
Object object = this.converter.fromMessage(msg, par.getParameterType(), par);
Assert.isInstanceOf(SnsMessageConverter.SnsMessageWrapper.class, object);
SnsMessageConverter.SnsMessageWrapper nr = (SnsMessageConverter.SnsMessageWrapper) object;
return nr.message();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
*
* @author Tomaz Fernandes
* @author Mikhail Strokov
* @author Michael Sosa
* @author gustavomonarin
*/
@SpringBootTest
@TestPropertySource(properties = { "property.one=1", "property.five.seconds=5s",
Expand Down
Loading
Loading