Skip to content

Commit e0f7ec2

Browse files
committed
apply PulsarTxnTestsBase
Signed-off-by: Andrey Litvitski <[email protected]>
1 parent ea472c2 commit e0f7ec2

File tree

3 files changed

+45
-20
lines changed

3 files changed

+45
-20
lines changed

spring-pulsar/spring-pulsar.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ dependencies {
1919
api 'org.springframework:spring-context'
2020
api 'org.springframework:spring-messaging'
2121
api 'org.springframework:spring-tx'
22-
testImplementation libs.spring.boot.starter.pulsar
2322
api (libs.spring.retry) {
2423
exclude group: 'org.springframework'
2524
}

spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTxnTests.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@
2929
import org.junit.jupiter.api.Test;
3030

3131
import org.springframework.beans.factory.annotation.Autowired;
32-
import org.springframework.boot.pulsar.autoconfigure.PulsarAutoConfiguration;
33-
import org.springframework.boot.pulsar.autoconfigure.PulsarContainerFactoryCustomizer;
3432
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
3533
import org.springframework.context.annotation.Configuration;
34+
import org.springframework.pulsar.listener.PulsarTxnTestsBase.TestPulsarContainerPropertiesCustomizer;
3635
import org.springframework.pulsar.annotation.EnablePulsar;
3736
import org.springframework.pulsar.annotation.PulsarListener;
38-
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
3937
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
4038
import org.springframework.pulsar.core.PulsarTemplate;
4139
import org.springframework.pulsar.listener.PulsarListenerTxnTests.BatchListenerWithCommit.BatchListenerWithCommitConfig;
@@ -276,10 +274,8 @@ void throwsExceptionWhenTransactionsAreRequired() {
276274
assertThatIllegalStateException().isThrownBy(() -> {
277275
var context = new AnnotationConfigApplicationContext();
278276
context.register(TopLevelConfig.class, TransactionsDisabledOnListenerConfig.class);
279-
context.registerBean(PulsarAutoConfiguration.class);
280-
context.registerBean("containerPropsRequiredCustomizer", PulsarContainerFactoryCustomizer.class,
281-
() -> ((PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>) (
282-
cf) -> cf.getContainerProperties().transactions().setRequired(true)));
277+
context.registerBean("containerPropsRequiredCustomizer", TestPulsarContainerPropertiesCustomizer.class,
278+
() -> containerProps -> containerProps.transactions().setRequired(true));
283279
context.refresh();
284280
}).withMessage("Listener w/ id [%s] requested no transactions but txn are required".formatted(LISTENER_ID));
285281
}
@@ -288,10 +284,9 @@ void throwsExceptionWhenTransactionsAreRequired() {
288284
void disablesTransactionsWhenTransactionsAreNotRequired() {
289285
try (var context = new AnnotationConfigApplicationContext()) {
290286
context.register(TopLevelConfig.class, TransactionsDisabledOnListenerConfig.class);
291-
context.registerBean(PulsarAutoConfiguration.class);
292-
context.registerBean("containerPropsNotRequiredCustomizer", PulsarContainerFactoryCustomizer.class,
293-
() -> ((PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>) (
294-
cf) -> cf.getContainerProperties().transactions().setRequired(false)));
287+
context.registerBean("containerPropsNotRequiredCustomizer",
288+
TestPulsarContainerPropertiesCustomizer.class,
289+
() -> containerProps -> containerProps.transactions().setRequired(false));
295290
context.refresh();
296291
var container = context.getBean(PulsarListenerEndpointRegistry.class).getListenerContainer(LISTENER_ID);
297292
assertThat(container).isNotNull();
@@ -322,10 +317,8 @@ void ignoresSettingWhenNoTxnManagerAvailable() {
322317
assertThatException().isThrownBy(() -> {
323318
var context = new AnnotationConfigApplicationContext();
324319
context.register(TopLevelConfig.class, TransactionsEnabledOnListenerConfig.class);
325-
context.registerBean(PulsarAutoConfiguration.class);
326-
context.registerBean("removeTxnManagerCustomizer", PulsarContainerFactoryCustomizer.class,
327-
() -> ((PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>) (
328-
cf) -> cf.getContainerProperties().transactions().setTransactionManager(null)));
320+
context.registerBean("removeTxnManagerCustomizer", TestPulsarContainerPropertiesCustomizer.class,
321+
() -> containerProps -> containerProps.transactions().setTransactionManager(null));
329322
context.refresh();
330323
})
331324
.withCauseInstanceOf(IllegalStateException.class)
@@ -337,10 +330,9 @@ void ignoresSettingWhenNoTxnManagerAvailable() {
337330
void enablesTransactionsWhenTxnManagerAvailable() {
338331
try (var context = new AnnotationConfigApplicationContext()) {
339332
context.register(TopLevelConfig.class, TransactionsEnabledOnListenerConfig.class);
340-
context.registerBean(PulsarAutoConfiguration.class);
341-
context.registerBean("containerPropsNotRequiredCustomizer", PulsarContainerFactoryCustomizer.class,
342-
() -> ((PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>) (
343-
cf) -> cf.getContainerProperties().transactions().setEnabled(false)));
333+
context.registerBean("containerPropsNotRequiredCustomizer",
334+
TestPulsarContainerPropertiesCustomizer.class,
335+
() -> containerProps -> containerProps.transactions().setEnabled(false));
344336
context.refresh();
345337
var container = context.getBean(PulsarListenerEndpointRegistry.class).getListenerContainer(LISTENER_ID);
346338
assertThat(container).isNotNull();
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.springframework.pulsar.listener;
2+
3+
import org.jspecify.annotations.Nullable;
4+
5+
import org.springframework.context.annotation.Bean;
6+
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
7+
8+
/**
9+
* @author Chris Bono
10+
* @author Andrey Litvitski
11+
*/
12+
public class PulsarTxnTestsBase {
13+
14+
@FunctionalInterface
15+
public interface TestPulsarContainerPropertiesCustomizer {
16+
17+
void customize(PulsarContainerProperties containerProps);
18+
19+
}
20+
21+
@Bean
22+
PulsarContainerProperties pulsarContainerProperties(PulsarAwareTransactionManager pulsarTransactionManager,
23+
@Nullable TestPulsarContainerPropertiesCustomizer containerPropsCustomizer) {
24+
var containerProps = new PulsarContainerProperties();
25+
containerProps.transactions().setEnabled(true);
26+
containerProps.transactions().setRequired(false);
27+
containerProps.transactions().setTransactionManager(pulsarTransactionManager);
28+
if (containerPropsCustomizer != null) {
29+
containerPropsCustomizer.customize(containerProps);
30+
}
31+
return containerProps;
32+
}
33+
34+
}

0 commit comments

Comments
 (0)