Skip to content

Commit

Permalink
Polishing
Browse files Browse the repository at this point in the history
  • Loading branch information
tomazfernandes committed Aug 20, 2024
1 parent ac759cf commit b5a0995
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -100,7 +102,7 @@ public void setObjectMapper(ObjectMapper objectMapper) {
converter.setObjectMapper(objectMapper);
}

protected Optional<MappingJackson2MessageConverter> getMappingJackson2MessageConverter() {
private Optional<MappingJackson2MessageConverter> getMappingJackson2MessageConverter() {
return this.payloadMessageConverter instanceof CompositeMessageConverter compositeConverter
? compositeConverter.getConverters().stream()
.filter(converter -> converter instanceof MappingJackson2MessageConverter)
Expand Down Expand Up @@ -174,34 +176,24 @@ private MessageHeaders getContextHeaders(S message, MessageConversionContext con
return ((ContextAwareHeaderMapper<S>) this.headerMapper).createContextHeaders(message, context);
}

private Object convertPayload(S message, MessageHeaders messageHeaders, @Nullable MessageConversionContext context) {
private Object convertPayload(S message, MessageHeaders messageHeaders,
@Nullable MessageConversionContext context) {
Message<?> messagingMessage = MessageBuilder.createMessage(getPayloadToDeserialize(message), messageHeaders);
Class<?> targetType = getTargetType(messagingMessage, context);
if (targetType == null) {
return messagingMessage.getPayload();
}
Object convertedPayload = this.payloadMessageConverter.fromMessage(messagingMessage, targetType);
return Objects.requireNonNull(convertedPayload, "payloadMessageConverter returned null payload");
return targetType != null
? Objects.requireNonNull(this.payloadMessageConverter.fromMessage(messagingMessage, targetType),
"payloadMessageConverter returned null payload")
: messagingMessage.getPayload();
}


@Nullable
private Class<?> getTargetType(Message<?> messagingMessage, @Nullable MessageConversionContext context) {
Class<?> classFromTypeMapper = this.payloadTypeMapper.apply(messagingMessage);

if (context != null && context.getPayloadClass() != null) {
if (!context.getPayloadClass().equals(String.class)) {
return context.getPayloadClass();
}
if (classFromTypeMapper != null && !classFromTypeMapper.equals(String.class)) {
return classFromTypeMapper;
}
}

return classFromTypeMapper != null ? classFromTypeMapper : (context != null ? context.getPayloadClass() : null);
return classFromTypeMapper == null && context != null && context.getPayloadClass() != null
? context.getPayloadClass()
: classFromTypeMapper;
}


protected abstract Object getPayloadToDeserialize(S message);

@Nullable
Expand All @@ -227,14 +219,20 @@ public MessageConversionContext createMessageConversionContext() {
public S fromMessagingMessage(Message<?> message, @Nullable MessageConversionContext context) {
// We must make sure the message id stays consistent throughout this process
MessageHeaders headers = getMessageHeaders(message);
Message<?> convertedMessage = Objects.requireNonNull(
this.payloadMessageConverter.toMessage(message.getPayload(), message.getHeaders()),
() -> "payloadMessageConverter returned null message for message " + message);
Message<?> convertedMessage = convertPayload(message, message.getPayload());
MessageHeaders completeHeaders = MessageHeaderUtils.addHeadersIfAbsent(headers, convertedMessage.getHeaders());
S messageWithHeaders = this.headerMapper.fromHeaders(completeHeaders);
return doConvertMessage(messageWithHeaders, convertedMessage.getPayload());
}

private Message<?> convertPayload(Message<?> message, Object payload) {
return Objects.requireNonNull(
this.payloadMessageConverter instanceof SmartMessageConverter smartConverter
? smartConverter.toMessage(payload, message.getHeaders(), String.class)
: this.payloadMessageConverter.toMessage(payload, message.getHeaders())
, () -> "payloadMessageConverter returned null message for message " + message);
}

private MessageHeaders getMessageHeaders(Message<?> message) {
String typeHeaderName = this.payloadTypeHeaderFunction.apply(message);
return typeHeaderName != null
Expand All @@ -246,6 +244,7 @@ private MessageHeaders getMessageHeaders(Message<?> message) {

private CompositeMessageConverter createDefaultCompositeMessageConverter() {
List<MessageConverter> messageConverters = new ArrayList<>();
messageConverters.add(new SimpleMessageConverter());
messageConverters.add(createStringMessageConverter());
messageConverters.add(createDefaultMappingJackson2MessageConverter());
return new CompositeMessageConverter(messageConverters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@
*/
package io.awspring.cloud.sqs.support.converter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.util.Assert;

/**
* {@link MessagingMessageConverter} implementation for converting SQS
* {@link software.amazon.awssdk.services.sqs.model.Message} instances to Spring Messaging {@link Message} instances.
*
* @author Dongha kim
* @author Tomaz Fernandes
* @author Dongha kim
* @since 3.0
* @see SqsHeaderMapper
* @see SqsMessageConversionContext
Expand All @@ -42,24 +38,7 @@ protected HeaderMapper<software.amazon.awssdk.services.sqs.model.Message> create

@Override
protected Object getPayloadToDeserialize(software.amazon.awssdk.services.sqs.model.Message message) {
String body = message.body();

ObjectMapper objectMapper = getMappingJackson2MessageConverter()
.map(MappingJackson2MessageConverter::getObjectMapper)
.orElse(new ObjectMapper());

try {
ObjectNode jsonNode = objectMapper.readValue(body, ObjectNode.class);
return objectMapper.writeValueAsString(jsonNode);
} catch (JsonProcessingException e) {
try {
String decodedBody = objectMapper.readValue(body, String.class);
ObjectNode jsonNode = objectMapper.readValue(decodedBody, ObjectNode.class);
return objectMapper.writeValueAsString(jsonNode);
} catch (JsonProcessingException e2) {
return body;
}
}
return message.body();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class SqsTemplateIntegrationTests extends BaseSqsIntegrationTest {

private static final String HANDLES_CONTENT_DEDUPLICATION_QUEUE_NAME = "handles-content-deduplication-queue.fifo";

private static final String SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME = "send-receive-json-message-queue";

@Autowired
private SqsAsyncClient asyncClient;

Expand All @@ -87,7 +89,9 @@ static void beforeTests() {
createQueue(client, RECORD_WITHOUT_TYPE_HEADER_QUEUE_NAME),
createQueue(client, RETURNS_ON_PARTIAL_BATCH_QUEUE_NAME),
createQueue(client, THROWS_ON_PARTIAL_BATCH_QUEUE_NAME),
createQueue(client, SENDS_AND_RECEIVES_MANUAL_ACK_QUEUE_NAME), createQueue(client, EMPTY_QUEUE_NAME),
createQueue(client, SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME),
createQueue(client, SENDS_AND_RECEIVES_MANUAL_ACK_QUEUE_NAME),
createQueue(client, EMPTY_QUEUE_NAME),
createFifoQueue(client, SENDS_AND_RECEIVES_MESSAGE_FIFO_QUEUE_NAME),
createFifoQueue(client, SENDS_AND_RECEIVES_BATCH_FIFO_QUEUE_NAME),
createFifoQueue(client, HANDLES_CONTENT_DEDUPLICATION_QUEUE_NAME,
Expand Down Expand Up @@ -184,18 +188,22 @@ void shouldSendAndReceiveWithManualAcknowledgement() {

@Test
void shouldSendAndReceiveJsonString() {
SqsOperations template = SqsTemplate.newSyncTemplate(this.asyncClient);
SqsOperations template = SqsTemplate.builder()
.sqsAsyncClient(this.asyncClient)
.configureDefaultConverter(AbstractMessagingMessageConverter::doNotSendPayloadTypeHeader)
.buildSyncTemplate();
String jsonString = """
{
"propertyOne": "hello",
"propertyTwo": "sqs!"
}
""";
SampleRecord expectedPayload = new SampleRecord("hello", "sqs!");
SendResult<Object> result = template.send(to -> to.queue(SENDS_AND_RECEIVES_MESSAGE_QUEUE_NAME)
SendResult<Object> result = template.send(to -> to.queue(SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME)
.payload(jsonString).header(MessageHeaders.CONTENT_TYPE, "application/json"));
assertThat(result).isNotNull();
Optional<Message<SampleRecord>> receivedMessage = template.receive(from -> from.queue(SENDS_AND_RECEIVES_MESSAGE_QUEUE_NAME), SampleRecord.class);
Optional<Message<SampleRecord>> receivedMessage = template
.receive(from -> from.queue(SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME), SampleRecord.class);
assertThat(receivedMessage).isPresent().get().extracting(Message::getPayload).isEqualTo(expectedPayload);
}

Expand Down

0 comments on commit b5a0995

Please sign in to comment.