Skip to content

Commit

Permalink
Refactor AMQP sample to use TestContainers for RabbitMQ
Browse files Browse the repository at this point in the history
Issue #4052
  • Loading branch information
fmbenhassine committed Sep 21, 2023
1 parent 008e582 commit e708bf2
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 58 deletions.
12 changes: 12 additions & 0 deletions spring-batch-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

import javax.sql.DataSource;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
Expand All @@ -42,18 +39,11 @@
* Sample Configuration to demonstrate a simple reader and writer for AMQP.
*
* @author Glenn Renfro
* @author Mahmoud Ben Hassine
*/
@Configuration
@EnableBatchProcessing
public class AmqpConfiguration {

public final static String QUEUE_NAME = "rabbitmq.test.queue";

public final static String EXCHANGE_NAME = "rabbitmq.test.exchange";

private final static int amqpPort = 5672;

private final static String host = "127.0.0.1";
public class AmqpJobConfiguration {

@Bean
public Job job(JobRepository jobRepository, Step step) {
Expand Down Expand Up @@ -85,55 +75,24 @@ public JdbcTransactionManager transactionManager(DataSource dataSource) {

/**
* Reads from the designated queue.
* @param template the template to be used by the {@link ItemReader}.
* @param rabbitInputTemplate the template to be used by the {@link ItemReader}.
* @return instance of {@link ItemReader}.
*/
@Bean
public ItemReader<String> amqpItemReader(RabbitTemplate template) {
public ItemReader<String> amqpItemReader(RabbitTemplate rabbitInputTemplate) {
AmqpItemReaderBuilder<String> builder = new AmqpItemReaderBuilder<>();
return builder.amqpTemplate(template).build();
return builder.amqpTemplate(rabbitInputTemplate).build();
}

/**
* Reads from the designated destination.
* @param template the template to be used by the {@link ItemWriter}.
* @param rabbitOutputTemplate the template to be used by the {@link ItemWriter}.
* @return instance of {@link ItemWriter}.
*/
@Bean
public ItemWriter<String> amqpItemWriter(RabbitTemplate template) {
public ItemWriter<String> amqpItemWriter(RabbitTemplate rabbitOutputTemplate) {
AmqpItemWriterBuilder<String> builder = new AmqpItemWriterBuilder<>();
return builder.amqpTemplate(template).build();
}

/**
* @return {@link CachingConnectionFactory} to be used by the {@link AmqpTemplate}
*/
@Bean
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory(host, amqpPort);
}

/**
* @return {@link AmqpTemplate} to be used for the {@link ItemWriter}
*/
@Bean
public AmqpTemplate rabbitOutputTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.setExchange(EXCHANGE_NAME);
return template;
}

/**
* @return {@link AmqpTemplate} to be used for the {@link ItemReader}.
*/
@Bean
public RabbitTemplate rabbitInputTemplate() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, amqpPort);
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.setDefaultReceiveQueue(QUEUE_NAME);
return template;
return builder.amqpTemplate(rabbitOutputTemplate).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,31 @@
package org.springframework.batch.sample;

import org.junit.jupiter.api.Test;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.sample.amqp.AmqpConfiguration;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.sample.amqp.AmqpJobConfiguration;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -52,8 +63,14 @@

@SpringJUnitConfig(
locations = { "/simple-job-launcher-context.xml", "/jobs/amqp-example-job.xml", "/job-runner-context.xml" })
@Testcontainers(disabledWithoutDocker = true)
class AMQPJobFunctionalTests {

private static final DockerImageName RABBITMQ_IMAGE = DockerImageName.parse("rabbitmq:3");

@Container
public static RabbitMQContainer rabbitmq = new RabbitMQContainer(RABBITMQ_IMAGE);

@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;

Expand All @@ -75,8 +92,8 @@ void testLaunchJobWithXmlConfig() throws Exception {
@Test
public void testLaunchJobWithJavaConfig() throws Exception {
// given
ApplicationContext context = new AnnotationConfigApplicationContext(AmqpConfiguration.class);
initializeExchange(context.getBean(CachingConnectionFactory.class));
ApplicationContext context = new AnnotationConfigApplicationContext(AmqpJobConfiguration.class,
AmqpConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);

Expand All @@ -89,12 +106,50 @@ public void testLaunchJobWithJavaConfig() throws Exception {
assertTrue(count > 0);
}

private void initializeExchange(CachingConnectionFactory connectionFactory) {
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue(AmqpConfiguration.QUEUE_NAME));
admin.declareExchange(new TopicExchange(AmqpConfiguration.EXCHANGE_NAME));
admin.declareBinding(new Binding(AmqpConfiguration.QUEUE_NAME, Binding.DestinationType.QUEUE,
AmqpConfiguration.EXCHANGE_NAME, "#", null));
@Configuration
static class AmqpConfiguration {

public final static String QUEUE_NAME = "rabbitmq.test.queue";

public final static String EXCHANGE_NAME = "rabbitmq.test.exchange";

/**
* @return {@link CachingConnectionFactory} to be used by the {@link AmqpTemplate}
*/
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitmq.getHost(),
rabbitmq.getAmqpPort());
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue(AmqpConfiguration.QUEUE_NAME));
admin.declareExchange(new TopicExchange(AmqpConfiguration.EXCHANGE_NAME));
admin.declareBinding(new Binding(AmqpConfiguration.QUEUE_NAME, Binding.DestinationType.QUEUE,
AmqpConfiguration.EXCHANGE_NAME, "#", null));
return connectionFactory;
}

/**
* @return {@link AmqpTemplate} to be used for the {@link ItemWriter}
*/
@Bean
public AmqpTemplate rabbitOutputTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.setExchange(EXCHANGE_NAME);
return template;
}

/**
* @return {@link AmqpTemplate} to be used for the {@link ItemReader}.
*/
@Bean
public RabbitTemplate rabbitInputTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.setDefaultReceiveQueue(QUEUE_NAME);
return template;
}

}

}

0 comments on commit e708bf2

Please sign in to comment.