<!-- 整合 Sprig Boot Starter ActiveMQ -->
<!-- 间接依赖:
spring jms
jms api
activemq
spring boot jms
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
$ brew install apache-activemq
$ activemq console
请注意启动后的控制台输出:
INFO | Listening for connections at: tcp://Mercy-MacBook-Pro.local:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
其中 tcp://Mercy-MacBook-Pro.local:61616
就是 broker URL,请注意将主机名转换成 localhost:
tcp://localhost:61616
private static void sendMessage() throws Exception {
// 创建 ActiveMQ 链接,设置 Broker URL
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创造 JMS 链接
Connection connection = connectionFactory.createConnection();
// 创建会话 Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息目的 - Queue 名称为 "TEST"
Destination destination = session.createQueue("TEST");
// 创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 创建消息 - 文本消息
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("Hello,World");
// 发送文本消息
producer.send(message);
// 关闭消息生产者
producer.close();
// 关闭会话
session.close();
// 关闭连接
connection.close();
}
private static void receiveMessage() throws Exception {
// 创建 ActiveMQ 链接,设置 Broker URL
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创造 JMS 链接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话 Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息目的 - Queue 名称为 "TEST"
Destination destination = session.createQueue("TEST");
// 创建消息消费者
MessageConsumer messageConsumer = session.createConsumer(destination);
// 获取消息
Message message = messageConsumer.receive(100);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("消息消费内容:" + textMessage.getText());
}
// 关闭消息消费者
messageConsumer.close();
// 关闭会话
session.close();
// 关闭连接
connection.stop();
connection.close();
}
<!-- 整合 Sprig Boot Starter ActiveMQ -->
<!-- 间接依赖:
spring jms
jms api
activemq
spring boot jms
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
application.properties
## ActiveMQ 配置
spring.activemq.brokerUrl = tcp://localhost:61616
application.properties
## JMS 配置
spring.jms.template.defaultDestination = sf-users-activemq
UserServiceClientController.java
@Autowired
private JmsTemplate jmsTemplate;
@PostMapping("/user/save/message/activemq")
public boolean saveUserByActiveMQMessage(@RequestBody User user) throws Exception {
jmsTemplate.convertAndSend(user);
return true;
}
预先启动 "eureka-server" 以及 "config-server"
提示:重复 ActiveMQ 配置 以及 JMS 配置
@Autowired
private JmsTemplate jmsTemplate;
@GetMapping("/user/poll")
public Object pollUser() {
// 获取消息队列中,默认 destination = sf-users-activemq
return jmsTemplate.receiveAndConvert();
}
<!-- 整合 Sprig Boot Starter ActiveMQ -->
<!-- 间接依赖:
spring jms
jms api
activemq
spring boot jms
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
package com.springcloud.stream.binder.activemq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
/**
* Active MQ MessageChannel Binder 实现
*
* @author <a href="mailto:[email protected]">Finen</a>
* @see
* @since
*/
public class ActiveMQMessageChannelBinder implements
Binder<MessageChannel, ConsumerProperties, ProducerProperties> {
@Autowired
private JmsTemplate jmsTemplate;
/**
* 接受MQ消息
* @param name
* @param group
* @param inboundBindTarget
* @param consumerProperties
* @return
*/
@Override
public Binding<MessageChannel> bindConsumer(String name, String group, MessageChannel inboundBindTarget, ConsumerProperties consumerProperties) {
return () -> {
};
}
/**
* 负责发送消息到ActiveMQ
* @param name
* @param outboundBindTarget
* @param producerProperties
* @return
*/
@Override
public Binding<MessageChannel> bindProducer(String name, MessageChannel outboundBindTarget, ProducerProperties producerProperties) {
Assert.isInstanceOf(SubscribableChannel.class, outboundBindTarget,
"Binding is Supported only for subscribableChannel instances");
SubscribableChannel subscribableChannel = (SubscribableChannel) outboundBindTarget;
subscribableChannel.subscribe(message -> {
// 接受内部管道消息,来自于MessageChannel#send(message);
// 实际并没有发送消息,而是将此消息发送到ActiveMQ Broker
Object messageBody = message.getPayload();
jmsTemplate.convertAndSend(name, messageBody);
});
return () -> {
System.out.println("Unbinding");
};
}
}
package com.springcloud.stream.binder.activemq;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* ActiveMQ Stream Binder 自动装配
*
* @author <a href="mailto:[email protected]">Finen</a>
* @see {@link org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration}
* @since 0.0.1
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
public class ActiveMQStreamBinderAutoConfiguration {
@Bean
public ActiveMQMessageChannelBinder activeMQMessageChannelBinder() {
return new ActiveMQMessageChannelBinder();
}
}
activemq :\
com.segumentfault.spring.cloud.stream.binder.activemq.ActiveMQStreamBinderAutoConfiguration
<!-- 引入 Active MQ Spring Cloud Stream Binder 实现 -->
<dependency>
<groupId>com.segumentfault</groupId>
<artifactId>spring-cloud-stream-binder-activemq</artifactId>
<version>${project.version}</version>
</dependency>
## Spring Cloud 默认 Binder
spring.cloud.stream.default-binder=activemq
## 给消息管道配置activemq-out配置
spring.cloud.stream.bindings.activemq-out.binder=activemq
spring.cloud.stream.bindings.activemq-out.destination=users-activemq
/**
* 接受MQ消息
*
* @param name
* @param group
* @param inputChannel
* @param consumerProperties
* @return
*/
@Override
public Binding<MessageChannel> bindConsumer(String name, String group, MessageChannel inputChannel, ConsumerProperties consumerProperties) {
// 创建 ActiveMQ 链接,设置 Broker URL
ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
try {
// 创造 JMS 链接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话 Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息目的 - Queue 名称为 "TEST"
Destination destination = session.createQueue(name);
// 创建消息消费者
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(message -> {
// message 来源于activeMQ
// 当前消息为ActiveMQBytesMessage
if (message instanceof ActiveMQBytesMessage) {
System.out.println("**********************");
// ObjectMessage objectMessage = (ObjectMessage) message;
inputChannel.send(new GenericMessage<>(message));
} else {
System.out.println("===================");
}
});
} catch (JMSException e) {
e.printStackTrace();
}
return () -> {
};
}
<!-- 引入 Active MQ Spring Cloud Stream Binder 实现 -->
<dependency>
<groupId>com.segumentfault</groupId>
<artifactId>spring-cloud-stream-binder-activemq</artifactId>
<version>${project.version}</version>
</dependency>
## Spring Cloud 默认 Binder
spring.cloud.stream.default-binder=rabbit
## 给消息管道配置activemq-out配置
spring.cloud.stream.bindings.activemq-in.binder=activemq
spring.cloud.stream.bindings.activemq-in.destination=users-activemq
@PostConstruct
public void init() {
System.out.println("Subscribe by @SubscribableChannel");
SubscribableChannel subscribableChannel = userMessage.input();
// 监听ActiveMQ Stream
userMessage.activeMQIn().subscribe(message -> {
if (message instanceof GenericMessage) {
// message body 是字节流
String contentType = String.valueOf(message.getHeaders().get("contentType"));
// 根据 ContentType 类型判断
if ("application/json".equals(contentType)) {
try {
ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message.getPayload();
byte[] bytes = activeMQBytesMessage.getContent().getData();
saveUser(new String(bytes));
} catch (Exception e) {
e.printStackTrace();
}
} else {
byte[] body = (byte[]) message.getPayload();
saveUser(body);
}
}
});
}