|
28 | 28 | import com.rabbitmq.stream.MessageBuilder.PropertiesBuilder; |
29 | 29 | import com.rabbitmq.stream.Properties; |
30 | 30 | import com.rabbitmq.stream.codec.WrapperMessageBuilder; |
| 31 | +import org.jspecify.annotations.Nullable; |
31 | 32 |
|
32 | 33 | import org.springframework.amqp.core.Message; |
33 | 34 | import org.springframework.amqp.core.MessageProperties; |
|
36 | 37 | import org.springframework.lang.Nullable; |
37 | 38 | import org.springframework.rabbit.stream.support.StreamMessageProperties; |
38 | 39 | import org.springframework.util.Assert; |
| 40 | +import org.springframework.util.StringUtils; |
39 | 41 |
|
40 | 42 | /** |
41 | 43 | * Default {@link StreamMessageConverter}. |
42 | 44 | * |
43 | 45 | * @author Gary Russell |
44 | 46 | * @author Ngoc Nhan |
| 47 | + * @author Artem Bilan |
| 48 | + * |
45 | 49 | * @since 2.4 |
46 | 50 | * |
47 | 51 | */ |
@@ -92,30 +96,37 @@ public com.rabbitmq.stream.Message fromMessage(Message message) throws MessageCo |
92 | 96 | Assert.isInstanceOf(StreamMessageProperties.class, props); |
93 | 97 | StreamMessageProperties mProps = (StreamMessageProperties) props; |
94 | 98 | JavaUtils.INSTANCE |
95 | | - .acceptIfNotNull(mProps.getMessageId(), propsBuilder::messageId) // TODO different types |
| 99 | + .acceptIfNotNull(mProps.getMessageId(), propsBuilder::messageId) |
96 | 100 | .acceptIfNotNull(mProps.getUserId(), usr -> propsBuilder.userId(usr.getBytes(this.charset))) |
97 | 101 | .acceptIfNotNull(mProps.getTo(), propsBuilder::to) |
98 | 102 | .acceptIfNotNull(mProps.getSubject(), propsBuilder::subject) |
99 | 103 | .acceptIfNotNull(mProps.getReplyTo(), propsBuilder::replyTo) |
100 | | - .acceptIfNotNull(mProps.getCorrelationId(), propsBuilder::correlationId) // TODO different types |
| 104 | + .acceptIfNotNull(mProps.getCorrelationId(), propsBuilder::correlationId) |
101 | 105 | .acceptIfNotNull(mProps.getContentType(), propsBuilder::contentType) |
102 | 106 | .acceptIfNotNull(mProps.getContentEncoding(), propsBuilder::contentEncoding) |
103 | | - .acceptIfNotNull(mProps.getExpiration(), exp -> propsBuilder.absoluteExpiryTime(Long.parseLong(exp))) |
104 | 107 | .acceptIfNotNull(mProps.getCreationTime(), propsBuilder::creationTime) |
105 | 108 | .acceptIfNotNull(mProps.getGroupId(), propsBuilder::groupId) |
106 | 109 | .acceptIfNotNull(mProps.getGroupSequence(), propsBuilder::groupSequence) |
107 | 110 | .acceptIfNotNull(mProps.getReplyToGroupId(), propsBuilder::replyToGroupId); |
108 | 111 | ApplicationPropertiesBuilder appPropsBuilder = builder.applicationProperties(); |
109 | | - if (!mProps.getHeaders().isEmpty()) { |
110 | | - mProps.getHeaders().forEach((key, val) -> { |
111 | | - mapProp(key, val, appPropsBuilder); |
112 | | - }); |
| 112 | + |
| 113 | + long creationTime = mProps.getCreationTime(); |
| 114 | + if (creationTime <= 0) { |
| 115 | + creationTime = System.currentTimeMillis(); |
| 116 | + } |
| 117 | + propsBuilder.creationTime(creationTime); |
| 118 | + |
| 119 | + String expiration = mProps.getExpiration(); |
| 120 | + if (StringUtils.hasText(expiration)) { |
| 121 | + propsBuilder.absoluteExpiryTime(creationTime + Long.parseLong(expiration)); |
113 | 122 | } |
| 123 | + |
| 124 | + mProps.getHeaders().forEach((key, val) -> mapProp(key, val, appPropsBuilder)); |
114 | 125 | builder.addData(message.getBody()); |
115 | 126 | return builder.build(); |
116 | 127 | } |
117 | 128 |
|
118 | | - private void mapProp(String key, Object val, ApplicationPropertiesBuilder builder) { // NOSONAR - complexity |
| 129 | + private void mapProp(String key, @Nullable Object val, ApplicationPropertiesBuilder builder) { |
119 | 130 | if (val instanceof String string) { |
120 | 131 | builder.entry(key, string); |
121 | 132 | } |
@@ -155,19 +166,28 @@ private void toMessageProperties(com.rabbitmq.stream.Message streamMessage, |
155 | 166 | if (properties != null) { |
156 | 167 | JavaUtils.INSTANCE |
157 | 168 | .acceptIfNotNull(properties.getMessageIdAsString(), mProps::setMessageId) |
158 | | - .acceptIfNotNull(properties.getUserId(), usr -> mProps.setUserId(new String(usr, this.charset))) |
| 169 | + .acceptIfNotNull(properties.getUserId(), |
| 170 | + usr -> mProps.setUserId(new String(usr, this.charset))) |
159 | 171 | .acceptIfNotNull(properties.getTo(), mProps::setTo) |
160 | 172 | .acceptIfNotNull(properties.getSubject(), mProps::setSubject) |
161 | 173 | .acceptIfNotNull(properties.getReplyTo(), mProps::setReplyTo) |
162 | 174 | .acceptIfNotNull(properties.getCorrelationIdAsString(), mProps::setCorrelationId) |
163 | 175 | .acceptIfNotNull(properties.getContentType(), mProps::setContentType) |
164 | 176 | .acceptIfNotNull(properties.getContentEncoding(), mProps::setContentEncoding) |
165 | | - .acceptIfNotNull(properties.getAbsoluteExpiryTime(), |
166 | | - exp -> mProps.setExpiration(Long.toString(exp))) |
167 | | - .acceptIfNotNull(properties.getCreationTime(), mProps::setCreationTime) |
168 | 177 | .acceptIfNotNull(properties.getGroupId(), mProps::setGroupId) |
169 | 178 | .acceptIfNotNull(properties.getGroupSequence(), mProps::setGroupSequence) |
170 | 179 | .acceptIfNotNull(properties.getReplyToGroupId(), mProps::setReplyToGroupId); |
| 180 | + |
| 181 | + long creationTime = properties.getCreationTime(); |
| 182 | + if (creationTime <= 0) { |
| 183 | + creationTime = System.currentTimeMillis(); |
| 184 | + } |
| 185 | + mProps.setCreationTime(creationTime); |
| 186 | + |
| 187 | + long absoluteExpiryTime = properties.getAbsoluteExpiryTime(); |
| 188 | + if (absoluteExpiryTime > creationTime) { |
| 189 | + mProps.setExpiration(Long.toString(absoluteExpiryTime - creationTime)); |
| 190 | + } |
171 | 191 | } |
172 | 192 | Map<String, Object> applicationProperties = streamMessage.getApplicationProperties(); |
173 | 193 | if (applicationProperties != null) { |
|
0 commit comments