From e708bf23a1ddb20166c5082acb321cf1a6f51a51 Mon Sep 17 00:00:00 2001 From: Mahmoud Ben Hassine Date: Thu, 21 Sep 2023 12:54:35 +0200 Subject: [PATCH] Refactor AMQP sample to use TestContainers for RabbitMQ Issue #4052 --- spring-batch-samples/pom.xml | 12 +++ ...uration.java => AmqpJobConfiguration.java} | 57 ++------------- .../batch/sample/AMQPJobFunctionalTests.java | 73 ++++++++++++++++--- 3 files changed, 84 insertions(+), 58 deletions(-) rename spring-batch-samples/src/main/java/org/springframework/batch/sample/amqp/{AmqpConfiguration.java => AmqpJobConfiguration.java} (62%) diff --git a/spring-batch-samples/pom.xml b/spring-batch-samples/pom.xml index 0f2962a53f..4f85457072 100644 --- a/spring-batch-samples/pom.xml +++ b/spring-batch-samples/pom.xml @@ -187,6 +187,18 @@ ${junit-jupiter.version} test + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + org.testcontainers + rabbitmq + ${testcontainers.version} + test + org.hamcrest hamcrest-library diff --git a/spring-batch-samples/src/main/java/org/springframework/batch/sample/amqp/AmqpConfiguration.java b/spring-batch-samples/src/main/java/org/springframework/batch/sample/amqp/AmqpJobConfiguration.java similarity index 62% rename from spring-batch-samples/src/main/java/org/springframework/batch/sample/amqp/AmqpConfiguration.java rename to spring-batch-samples/src/main/java/org/springframework/batch/sample/amqp/AmqpJobConfiguration.java index 9939103d83..173b8fcbda 100644 --- a/spring-batch-samples/src/main/java/org/springframework/batch/sample/amqp/AmqpConfiguration.java +++ b/spring-batch-samples/src/main/java/org/springframework/batch/sample/amqp/AmqpJobConfiguration.java @@ -18,10 +18,7 @@ import javax.sql.DataSource; -import org.springframework.amqp.core.AmqpTemplate; -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; @@ -42,18 +39,11 @@ * Sample Configuration to demonstrate a simple reader and writer for AMQP. * * @author Glenn Renfro + * @author Mahmoud Ben Hassine */ @Configuration @EnableBatchProcessing -public class AmqpConfiguration { - - public final static String QUEUE_NAME = "rabbitmq.test.queue"; - - public final static String EXCHANGE_NAME = "rabbitmq.test.exchange"; - - private final static int amqpPort = 5672; - - private final static String host = "127.0.0.1"; +public class AmqpJobConfiguration { @Bean public Job job(JobRepository jobRepository, Step step) { @@ -85,55 +75,24 @@ public JdbcTransactionManager transactionManager(DataSource dataSource) { /** * Reads from the designated queue. - * @param template the template to be used by the {@link ItemReader}. + * @param rabbitInputTemplate the template to be used by the {@link ItemReader}. * @return instance of {@link ItemReader}. */ @Bean - public ItemReader amqpItemReader(RabbitTemplate template) { + public ItemReader amqpItemReader(RabbitTemplate rabbitInputTemplate) { AmqpItemReaderBuilder builder = new AmqpItemReaderBuilder<>(); - return builder.amqpTemplate(template).build(); + return builder.amqpTemplate(rabbitInputTemplate).build(); } /** * Reads from the designated destination. - * @param template the template to be used by the {@link ItemWriter}. + * @param rabbitOutputTemplate the template to be used by the {@link ItemWriter}. * @return instance of {@link ItemWriter}. */ @Bean - public ItemWriter amqpItemWriter(RabbitTemplate template) { + public ItemWriter amqpItemWriter(RabbitTemplate rabbitOutputTemplate) { AmqpItemWriterBuilder builder = new AmqpItemWriterBuilder<>(); - return builder.amqpTemplate(template).build(); - } - - /** - * @return {@link CachingConnectionFactory} to be used by the {@link AmqpTemplate} - */ - @Bean - public CachingConnectionFactory connectionFactory() { - return new CachingConnectionFactory(host, amqpPort); - } - - /** - * @return {@link AmqpTemplate} to be used for the {@link ItemWriter} - */ - @Bean - public AmqpTemplate rabbitOutputTemplate(CachingConnectionFactory connectionFactory) { - RabbitTemplate template = new RabbitTemplate(connectionFactory); - template.setMessageConverter(new Jackson2JsonMessageConverter()); - template.setExchange(EXCHANGE_NAME); - return template; - } - - /** - * @return {@link AmqpTemplate} to be used for the {@link ItemReader}. - */ - @Bean - public RabbitTemplate rabbitInputTemplate() { - CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, amqpPort); - RabbitTemplate template = new RabbitTemplate(connectionFactory); - template.setMessageConverter(new Jackson2JsonMessageConverter()); - template.setDefaultReceiveQueue(QUEUE_NAME); - return template; + return builder.amqpTemplate(rabbitOutputTemplate).build(); } } diff --git a/spring-batch-samples/src/test/java/org/springframework/batch/sample/AMQPJobFunctionalTests.java b/spring-batch-samples/src/test/java/org/springframework/batch/sample/AMQPJobFunctionalTests.java index e56c1e530a..3224bf89ca 100644 --- a/spring-batch-samples/src/test/java/org/springframework/batch/sample/AMQPJobFunctionalTests.java +++ b/spring-batch-samples/src/test/java/org/springframework/batch/sample/AMQPJobFunctionalTests.java @@ -16,20 +16,31 @@ package org.springframework.batch.sample; import org.junit.jupiter.api.Test; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.JobLauncher; -import org.springframework.batch.sample.amqp.AmqpConfiguration; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.sample.amqp.AmqpJobConfiguration; import org.springframework.batch.test.JobLauncherTestUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -52,8 +63,14 @@ @SpringJUnitConfig( locations = { "/simple-job-launcher-context.xml", "/jobs/amqp-example-job.xml", "/job-runner-context.xml" }) +@Testcontainers(disabledWithoutDocker = true) class AMQPJobFunctionalTests { + private static final DockerImageName RABBITMQ_IMAGE = DockerImageName.parse("rabbitmq:3"); + + @Container + public static RabbitMQContainer rabbitmq = new RabbitMQContainer(RABBITMQ_IMAGE); + @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @@ -75,8 +92,8 @@ void testLaunchJobWithXmlConfig() throws Exception { @Test public void testLaunchJobWithJavaConfig() throws Exception { // given - ApplicationContext context = new AnnotationConfigApplicationContext(AmqpConfiguration.class); - initializeExchange(context.getBean(CachingConnectionFactory.class)); + ApplicationContext context = new AnnotationConfigApplicationContext(AmqpJobConfiguration.class, + AmqpConfiguration.class); JobLauncher jobLauncher = context.getBean(JobLauncher.class); Job job = context.getBean(Job.class); @@ -89,12 +106,50 @@ public void testLaunchJobWithJavaConfig() throws Exception { assertTrue(count > 0); } - private void initializeExchange(CachingConnectionFactory connectionFactory) { - AmqpAdmin admin = new RabbitAdmin(connectionFactory); - admin.declareQueue(new Queue(AmqpConfiguration.QUEUE_NAME)); - admin.declareExchange(new TopicExchange(AmqpConfiguration.EXCHANGE_NAME)); - admin.declareBinding(new Binding(AmqpConfiguration.QUEUE_NAME, Binding.DestinationType.QUEUE, - AmqpConfiguration.EXCHANGE_NAME, "#", null)); + @Configuration + static class AmqpConfiguration { + + public final static String QUEUE_NAME = "rabbitmq.test.queue"; + + public final static String EXCHANGE_NAME = "rabbitmq.test.exchange"; + + /** + * @return {@link CachingConnectionFactory} to be used by the {@link AmqpTemplate} + */ + @Bean + public CachingConnectionFactory connectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitmq.getHost(), + rabbitmq.getAmqpPort()); + AmqpAdmin admin = new RabbitAdmin(connectionFactory); + admin.declareQueue(new Queue(AmqpConfiguration.QUEUE_NAME)); + admin.declareExchange(new TopicExchange(AmqpConfiguration.EXCHANGE_NAME)); + admin.declareBinding(new Binding(AmqpConfiguration.QUEUE_NAME, Binding.DestinationType.QUEUE, + AmqpConfiguration.EXCHANGE_NAME, "#", null)); + return connectionFactory; + } + + /** + * @return {@link AmqpTemplate} to be used for the {@link ItemWriter} + */ + @Bean + public AmqpTemplate rabbitOutputTemplate(CachingConnectionFactory connectionFactory) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(new Jackson2JsonMessageConverter()); + template.setExchange(EXCHANGE_NAME); + return template; + } + + /** + * @return {@link AmqpTemplate} to be used for the {@link ItemReader}. + */ + @Bean + public RabbitTemplate rabbitInputTemplate(CachingConnectionFactory connectionFactory) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(new Jackson2JsonMessageConverter()); + template.setDefaultReceiveQueue(QUEUE_NAME); + return template; + } + } }