Skip to content

Commit 6d182e0

Browse files
committed
Deprecate SerializationUtils.* constants in favor of KafkaUtils.*
Signed-off-by: Yanming Zhou <[email protected]>
1 parent 9c3718d commit 6d182e0

File tree

10 files changed

+49
-39
lines changed

10 files changed

+49
-39
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -527,9 +527,9 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
527527
tp = checkPartition(tp, consumer);
528528
}
529529
DeserializationException vDeserEx = SerializationUtils.getExceptionFromHeader(record,
530-
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
530+
KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
531531
DeserializationException kDeserEx = SerializationUtils.getExceptionFromHeader(record,
532-
SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
532+
KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
533533
Headers headers = new RecordHeaders(record.headers().toArray());
534534
addAndEnhanceHeaders(record, exception, vDeserEx, kDeserEx, headers);
535535
ProducerRecord<Object, Object> outRecord = createProducerRecord(record, tp, headers,
@@ -546,13 +546,13 @@ private void addAndEnhanceHeaders(ConsumerRecord<?, ?> record, Exception excepti
546546
}
547547
if (kDeserEx != null) {
548548
if (!this.retainExceptionHeader) {
549-
headers.remove(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
549+
headers.remove(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
550550
}
551551
this.exceptionHeadersCreator.create(headers, kDeserEx, true, this.headerNames);
552552
}
553553
if (vDeserEx != null) {
554554
if (!this.retainExceptionHeader) {
555-
headers.remove(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
555+
headers.remove(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
556556
}
557557
this.exceptionHeadersCreator.create(headers, vDeserEx, false, this.headerNames);
558558
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2869,10 +2869,10 @@ private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {
28692869
throw ex;
28702870
}
28712871
if (cRecord.value() == null && this.checkNullValueForExceptions) {
2872-
checkDeser(cRecord, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
2872+
checkDeser(cRecord, KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
28732873
}
28742874
if (cRecord.key() == null && this.checkNullKeyForExceptions) {
2875-
checkDeser(cRecord, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
2875+
checkDeser(cRecord, KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
28762876
}
28772877
doInvokeOnMessage(cRecord);
28782878
if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -577,8 +577,8 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) {
577577
* deserialization; null otherwise. If you need to determine whether it was the key or
578578
* value, call
579579
* {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}
580-
* with {@link SerializationUtils#KEY_DESERIALIZER_EXCEPTION_HEADER} and
581-
* {@link SerializationUtils#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead.
580+
* with {@link KafkaUtils#KEY_DESERIALIZER_EXCEPTION_HEADER} and
581+
* {@link KafkaUtils#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead.
582582
* @param record the record.
583583
* @param logger a {@link LogAccessor}.
584584
* @return the {@link DeserializationException} or {@code null}.
@@ -587,14 +587,14 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) {
587587
@Nullable
588588
public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) {
589589
DeserializationException exception = SerializationUtils.getExceptionFromHeader(record,
590-
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger);
590+
KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger);
591591
if (exception != null) {
592592
logger.error(exception, () -> "Reply value deserialization failed for " + record.topic() + "-"
593593
+ record.partition() + "@" + record.offset());
594594
return exception;
595595
}
596596
exception = SerializationUtils.getExceptionFromHeader(record,
597-
SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, logger);
597+
KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, logger);
598598
if (exception != null) {
599599
logger.error(exception, () -> "Reply key deserialization failed for " + record.topic() + "-"
600600
+ record.partition() + "@" + record.offset());

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.common.serialization.Deserializer;
2424
import org.jspecify.annotations.Nullable;
2525

26+
import org.springframework.kafka.support.KafkaUtils;
2627
import org.springframework.util.Assert;
2728
import org.springframework.util.ClassUtils;
2829
import org.springframework.validation.Validator;
@@ -208,10 +209,10 @@ private void setupValidator(Map<String, ?> configs) {
208209
public @Nullable T deserialize(String topic, Headers headers, byte[] data) {
209210
try {
210211
if (this.isForKey) {
211-
headers.remove(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
212+
headers.remove(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
212213
}
213214
else {
214-
headers.remove(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
215+
headers.remove(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
215216
}
216217
return this.delegate == null ? null : validate(this.delegate.deserialize(topic, headers, data));
217218
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/SerializationUtils.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,25 @@ public final class SerializationUtils {
4949
/**
5050
* Header name for deserialization exceptions.
5151
* @since 2.8
52+
* @deprecated for removal in favor of {@link KafkaUtils#DESERIALIZER_EXCEPTION_HEADER_PREFIX}
5253
*/
54+
@Deprecated(forRemoval = true)
5355
public static final String DESERIALIZER_EXCEPTION_HEADER_PREFIX = KafkaUtils.DESERIALIZER_EXCEPTION_HEADER_PREFIX;
5456

5557
/**
5658
* Header name for deserialization exceptions.
5759
* @since 2.8
60+
* @deprecated for removal in favor of {@link KafkaUtils#KEY_DESERIALIZER_EXCEPTION_HEADER}
5861
*/
62+
@Deprecated(forRemoval = true)
5963
public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER;
6064

6165
/**
6266
* Header name for deserialization exceptions.
6367
* @since 2.8
68+
* @deprecated for removal in favor of {@link KafkaUtils#VALUE_DESERIALIZER_EXCEPTION_HEADER}
6469
*/
70+
@Deprecated(forRemoval = true)
6571
public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER;
6672

6773
private SerializationUtils() {
@@ -175,8 +181,8 @@ data, isForKeyArg, new RuntimeException("Could not serialize type "
175181
}
176182
headers.add(
177183
new DeserializationExceptionHeader(isForKeyArg
178-
? KEY_DESERIALIZER_EXCEPTION_HEADER
179-
: VALUE_DESERIALIZER_EXCEPTION_HEADER,
184+
? KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER
185+
: KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
180186
stream.toByteArray()));
181187
}
182188

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@
4949
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames;
5050
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.SingleRecordHeader;
5151
import org.springframework.kafka.support.KafkaHeaders;
52+
import org.springframework.kafka.support.KafkaUtils;
5253
import org.springframework.kafka.support.SendResult;
5354
import org.springframework.kafka.support.converter.ConversionException;
5455
import org.springframework.kafka.support.serializer.DeserializationException;
5556
import org.springframework.kafka.support.serializer.SerializationTestUtils;
56-
import org.springframework.kafka.support.serializer.SerializationUtils;
5757
import org.springframework.kafka.test.utils.KafkaTestUtils;
5858

5959
import static org.assertj.core.api.Assertions.assertThat;
@@ -170,9 +170,9 @@ void valueHeaderStripped() {
170170
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
171171
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
172172
Headers headers = new RecordHeaders();
173-
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
173+
headers.add(SerializationTestUtils.deserializationHeader(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
174174
SerializationTestUtils.header(false)));
175-
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
175+
headers.add(SerializationTestUtils.deserializationHeader(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
176176
SerializationTestUtils.header(true)));
177177
Headers custom = new RecordHeaders();
178178
custom.add(new RecordHeader("foo", "bar".getBytes()));
@@ -189,8 +189,8 @@ void valueHeaderStripped() {
189189
assertThat(recovered.key()).isEqualTo("key".getBytes());
190190
assertThat(recovered.value()).isEqualTo("value".getBytes());
191191
headers = recovered.headers();
192-
assertThat(headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNull();
193-
assertThat(headers.lastHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull();
192+
assertThat(headers.lastHeader(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNull();
193+
assertThat(headers.lastHeader(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull();
194194
assertThat(headers.lastHeader("foo")).isNotNull();
195195
assertThat(headers.lastHeader(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE).value()).isEqualTo("testK".getBytes());
196196
assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE).value()).isEqualTo("testV".getBytes());
@@ -202,7 +202,7 @@ void keyHeaderStripped() {
202202
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
203203
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
204204
Headers headers = new RecordHeaders();
205-
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
205+
headers.add(SerializationTestUtils.deserializationHeader(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
206206
SerializationTestUtils.header(true)));
207207
CompletableFuture future = new CompletableFuture();
208208
future.complete(new Object());
@@ -213,7 +213,7 @@ void keyHeaderStripped() {
213213
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
214214
verify(template).send(captor.capture());
215215
headers = captor.getValue().headers();
216-
assertThat(headers.lastHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull();
216+
assertThat(headers.lastHeader(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull();
217217
}
218218

219219
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -223,7 +223,7 @@ void keyDeserOnly() {
223223
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
224224
Headers headers = new RecordHeaders();
225225
DeserializationException deserEx = SerializationTestUtils.createDeserEx(true);
226-
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
226+
headers.add(SerializationTestUtils.deserializationHeader(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
227227
SerializationTestUtils.header(deserEx)));
228228
CompletableFuture future = new CompletableFuture();
229229
future.complete(new Object());
@@ -246,9 +246,9 @@ void headersNotStripped() {
246246
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
247247
recoverer.setRetainExceptionHeader(true);
248248
Headers headers = new RecordHeaders();
249-
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
249+
headers.add(SerializationTestUtils.deserializationHeader(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
250250
SerializationTestUtils.header(false)));
251-
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
251+
headers.add(SerializationTestUtils.deserializationHeader(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
252252
SerializationTestUtils.header(true)));
253253
CompletableFuture future = new CompletableFuture();
254254
future.complete(new Object());
@@ -259,8 +259,8 @@ void headersNotStripped() {
259259
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
260260
verify(template).send(captor.capture());
261261
headers = captor.getValue().headers();
262-
assertThat(headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
263-
assertThat(headers.lastHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
262+
assertThat(headers.lastHeader(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
263+
assertThat(headers.lastHeader(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
264264
assertThat(new String(headers.lastHeader(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE).value())).isEqualTo("testK");
265265
assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE).value()).isEqualTo("testV".getBytes());
266266
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4848
import org.springframework.kafka.core.KafkaTemplate;
4949
import org.springframework.kafka.core.ProducerFactory;
50+
import org.springframework.kafka.support.KafkaUtils;
5051
import org.springframework.kafka.support.serializer.DeserializationException;
5152
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
5253
import org.springframework.kafka.support.serializer.SerializationUtils;
@@ -110,7 +111,7 @@ public void close() {
110111
Headers headers = new RecordHeaders();
111112
Object result = ehd.deserialize("topic", headers, "foo".getBytes());
112113
assertThat(result).isNull();
113-
Header deser = headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
114+
Header deser = headers.lastHeader(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
114115
assertThat(new ObjectInputStream(new ByteArrayInputStream(deser.value())).readObject()).isInstanceOf(DeserializationException.class);
115116
ehd.close();
116117
}
@@ -135,7 +136,7 @@ public String deserialize(String topic, Headers headers, byte[] data) {
135136
Headers headers = new RecordHeaders();
136137
ehd.deserialize("foo", headers, new byte[1]);
137138
DeserializationException dex = SerializationUtils.byteArrayToDeserializationException(null,
138-
headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
139+
headers.lastHeader(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
139140
assertThat(dex.getCause().getMessage())
140141
.contains("Could not serialize")
141142
.contains("original exception message");
@@ -150,7 +151,7 @@ void validate() {
150151
assertThat(ehd.deserialize("foo", headers, "foo".getBytes())).isEqualTo("foo");
151152
ehd.deserialize("foo", headers, "bar".getBytes());
152153
DeserializationException ex = SerializationUtils.byteArrayToDeserializationException(null,
153-
headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
154+
headers.lastHeader(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
154155
assertThat(ex.getCause()).isInstanceOf(IllegalStateException.class)
155156
.extracting("message", InstanceOfAssertFactories.STRING)
156157
.contains("validation failure");
@@ -170,7 +171,7 @@ public boolean supports(Class<?> clazz) {
170171
});
171172
ehd.deserialize("foo", headers, "baz".getBytes());
172173
ex = SerializationUtils.byteArrayToDeserializationException(null,
173-
headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
174+
headers.lastHeader(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
174175
assertThat(ex.getCause()).isInstanceOf(IllegalArgumentException.class)
175176
.extracting("message")
176177
.isEqualTo("test validation");

spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchAdapterConversionErrorsTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.kafka.listener.BatchListenerFailedException;
3535
import org.springframework.kafka.listener.ListenerExecutionFailedException;
3636
import org.springframework.kafka.support.KafkaHeaders;
37+
import org.springframework.kafka.support.KafkaUtils;
3738
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
3839
import org.springframework.kafka.support.converter.ConversionException;
3940
import org.springframework.kafka.support.converter.JacksonJsonMessageConverter;
@@ -74,7 +75,7 @@ void testNullInList(@Autowired KafkaListenerEndpointRegistry registry, @Autowire
7475
.isEqualTo(1);
7576
assertThat(listener.values).containsExactly(new Foo("baz"), null, new Foo("qux"));
7677
DeserializationException vDeserEx = SerializationUtils.getExceptionFromHeader(junkRecord,
77-
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, null);
78+
KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, null);
7879
assertThat(vDeserEx).isNotNull();
7980
assertThat(vDeserEx.getData()).isEqualTo("JUNK".getBytes());
8081
}

spring-kafka/src/test/java/org/springframework/kafka/support/JsonKafkaHeaderMapperTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -582,17 +582,17 @@ void deserializationExceptionHeadersAreMappedAsNonByteArray() {
582582
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
583583

584584
byte[] keyDeserExceptionBytes = SerializationTestUtils.header(true);
585-
Header keyHeader = SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
585+
Header keyHeader = SerializationTestUtils.deserializationHeader(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
586586
keyDeserExceptionBytes);
587587
byte[] valueDeserExceptionBytes = SerializationTestUtils.header(false);
588-
Header valueHeader = SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
588+
Header valueHeader = SerializationTestUtils.deserializationHeader(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
589589
valueDeserExceptionBytes);
590590
Headers headers = new RecordHeaders(
591591
new Header[] { keyHeader, valueHeader });
592592
Map<String, Object> springHeaders = new HashMap<>();
593593
mapper.toHeaders(headers, springHeaders);
594-
assertThat(springHeaders.get(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(keyHeader);
595-
assertThat(springHeaders.get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(valueHeader);
594+
assertThat(springHeaders.get(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(keyHeader);
595+
assertThat(springHeaders.get(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(valueHeader);
596596

597597
LogAccessor logger = new LogAccessor(this.getClass());
598598

@@ -605,8 +605,8 @@ void deserializationExceptionHeadersAreMappedAsNonByteArray() {
605605

606606
headers = new RecordHeaders();
607607
mapper.fromHeaders(new MessageHeaders(springHeaders), headers);
608-
assertThat(headers.lastHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull();
609-
assertThat(headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNull();
608+
assertThat(headers.lastHeader(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull();
609+
assertThat(headers.lastHeader(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNull();
610610
}
611611

612612
@Test

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationUtilsTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.mockito.ArgumentCaptor;
2828

2929
import org.springframework.core.log.LogAccessor;
30+
import org.springframework.kafka.support.KafkaUtils;
3031

3132
import static org.assertj.core.api.Assertions.assertThat;
3233
import static org.mockito.BDDMockito.given;
@@ -46,7 +47,7 @@ public class SerializationUtilsTests {
4647
@Test
4748
void foreignDeserEx() {
4849
RecordHeaders headers = new RecordHeaders(
49-
List.of(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, "junk".getBytes())));
50+
List.of(new RecordHeader(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, "junk".getBytes())));
5051
ConsumerRecord<String, String> rec = mock(ConsumerRecord.class);
5152
willReturn(headers).given(rec).headers();
5253
given(rec.topic()).willReturn("foo");
@@ -56,7 +57,7 @@ void foreignDeserEx() {
5657
ArgumentCaptor<Supplier<String>> captor = ArgumentCaptor.forClass(Supplier.class);
5758
willAnswer(inv -> null).given(logger).warn(captor.capture());
5859
assertThat(SerializationUtils.getExceptionFromHeader(rec,
59-
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger)).isNull();
60+
KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger)).isNull();
6061
assertThat(captor.getValue().get())
6162
.isEqualTo("Foreign deserialization exception header in (foo-1@0) ignored; possible attack?");
6263
}

0 commit comments

Comments
 (0)