章
目
录
RabbitMQ作为一款功能强大的消息队列中间件,与Spring Boot框架的整合相当常见。本文将详细介绍如何在Spring Boot项目中集成RabbitMQ,并深入探讨发布订阅、路由、通配符等多种工作模式的实现方式。
一、添加依赖与配置连接信息
在开始整合之前,首先要在Spring Boot项目的pom.xml
文件中添加RabbitMQ相关依赖,引入spring-boot-starter-amqp
依赖后,项目便能使用RabbitMQ的各项功能。具体依赖配置如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
接着,在application.properties
文件里配置RabbitMQ的连接信息,包括服务器地址、端口、用户名、密码、虚拟主机等。同时,还可以设置一些与消息处理相关的参数:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 配置消息确认模式为简单模式,消息成功处理后,RabbitMQ会发送确认信号
spring.rabbitmq.publisher-confirm-type=simple
# 配置监听器的最小并发数,启动时至少有3个消费者线程处理消息
spring.rabbitmq.listener.simple.concurrency=3
# 配置监听器的最大并发数,负载高时最多启动10个消费者线程
spring.rabbitmq.listener.simple.max-concurrency=10
二、发布订阅模式(Publish/Subscribe)
(一)模式原理
发布订阅模式下,需配置一个fanout
类型的交换器(Exchange)。这种交换器的特点是不依赖路由键(Routing key),它会将接收到的消息广播到与其绑定的每一个消息队列上。每个消息队列都能接收并存储相同的消息,然后由各自关联的消费者进行消费。
(二)代码实现
- 定义交换机、队列与绑定关系
可以借助AmqpAdmin
管理类来定义交换机、队列以及它们之间的绑定关系。以下是示例代码:
package com.xyu;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
@SpringBootTest
class SpringbootXyuProjectApplicationTests {
@Autowired
public AmqpAdmin amqpAdmin;
@Test
void contextLoads() {
}
@Test
public void amqpAdmin() {
// 定义一个fanout类型的交换机
amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
// 声明两个队列
amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
// 将队列与交换机进行绑定
amqpAdmin.declareBinding(new Binding(
"fanout_queue_email",
Binding.DestinationType.QUEUE,
"fanout_exchange",
"",
null
));
amqpAdmin.declareBinding(new Binding(
"fanout_queue_sms",
Binding.DestinationType.QUEUE,
"fanout_exchange",
"",
null
));
}
}
- 消息发送
创建实体类并进行序列化处理,使用RabbitTemplate
的convertAndSend
方法发送消息到指定的交换机。示例代码如下:
// 定义实体类User
package com.xyu.po;
public class User {
private String name;
private Integer age;
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
// 配置消息转换器,这里使用Jackson2JsonMessageConverter将Java对象转换为JSON字符串
package com.xyu.Config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
// 发送消息
@Test
public void Publisher() {
User user = new User();
user.setAge(18);
user.setName("天天");
// 将消息发送到fanout_exchange交换机,路由键为空
rabbitTemplate.convertAndSend("fanout_exchange", "", user);
}
- 消息监听
使用@RabbitListener
注解标记消息接收方法,Spring Boot会自动监听指定队列中的消息并调用相应方法进行处理。示例代码如下:
package com.xyu.Service;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class RabbitMqListenerService {
@RabbitListener(queues = "fanout_queue_email")
public void psubConsumerEmail(Message message) {
byte[] body = message.getBody();
String s = new String(body);
System.out.println("邮件业务接收到消息: " + s);
}
@RabbitListener(queues = "fanout_queue_sms")
public void psubConsumerSms(Message message) {
byte[] body = message.getBody();
String s = new String(body);
System.out.println("短信业务接收到消息: " + s);
}
}
(三)基于配置类的发布订阅模式实现
除了上述方式,还可以通过配置类来创建交换机、队列和绑定关系,同时启用RabbitMQ的注解功能。示例代码如下:
package com.xyu.Config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
// @EnableRabbit注解用于启用基于注解的RabbitMQ消息处理功能
public class RabbitMQConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("exchange_my");
}
@Bean
public Queue smsQueue() {
return new Queue("fanout_queue_tem", true); // 队列持久化
}
@Bean
public Queue emailQueue() {
return new Queue("fanout_queue_num", true); // 队列持久化
}
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
// 接收消息
@RabbitListener(bindings = @QueueBinding(value =
@Queue("fanout_queue_tem"), exchange =
@Exchange(value = "exchange_my", type = "fanout")))
public void psubConsumerEmailAno(User user) {
System.out.println("1邮件业务接收到消息: " + user);
}
@RabbitListener(bindings = @QueueBinding(value =
@Queue("fanout_queue_num"), exchange =
@Exchange(value = "exchange_my", type = "fanout")))
public void psubConsumerSmsAno(User user) {
System.out.println("1短信业务接收到消息: " + user);
}
// 发送消息
@Test
public void Publisher() {
User user = new User();
user.setAge(18);
user.setName("天天1");
// 向两个交换机发送相同消息
rabbitTemplate.convertAndSend("fanout_exchange", "", user);
rabbitTemplate.convertAndSend("exchange_my", "", user);
}
三、路由模式(Routing)
(一)模式原理
路由模式中,需配置一个direct
类型的交换器,并为不同的消息指定特定的路由键。交换器会根据路由键将消息准确地路由到与之匹配的消息队列中,消费者从各自对应的队列中获取消息进行消费。
(二)代码实现
- 定义交换机、队列与绑定关系
// 定义direct类型的交换机、队列和绑定关系
@Bean
public DirectExchange directExchange() {
return new DirectExchange("routing_exchange");
}
@Bean
public Queue routeQueue() {
return new Queue("routing_queue_all", true);
}
@Bean
public Binding routeBinding() {
return BindingBuilder.bind(routeQueue()).to(directExchange()).withQueueName();
}
- 消息发送与接收
// 发送消息
@Test
public void routingPublisher() {
rabbitTemplate.convertAndSend("routing_exchange",
"error_routing_key",
"routing send error message");
}
// 接收消息
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "routing_queue_all"),
exchange = @Exchange(value = "routing_exchange", type = "direct"),
key = "error_routing_key")
)
public void routingConsumerError(String message) {
System.out.println("接收到error级别日志消息: " + message);
}
四、通配符模式(Topics)
(一)模式原理
通配符模式同样使用topic
类型的交换器,它在路由键匹配上更加灵活,支持使用通配符(#
和*
)。#
代表零个或多个单词,*
代表一个单词,通过这种方式可以实现更细粒度的消息路由。
(二)代码实现
- 定义交换机、队列与绑定关系
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic_exchange");
}
@Bean
public Queue topic_queue_email() {
return new Queue("topic_queue_email", true);
}
@Bean
public BindingBuilder.TopicExchangeRoutingKeyConfigurer topicBinding() {
return BindingBuilder.bind(topic_queue_email()).to(topicExchange());
}
- 消息发送与接收
// 发送消息
@Test
public void topicPublisher() {
rabbitTemplate.convertAndSend("topic_exchange",
"info.email",
"topics send email message");
rabbitTemplate.convertAndSend("topic_exchange",
"info.sms",
"topics send sms message");
rabbitTemplate.convertAndSend("topic_exchange",
"info.email.sms",
"topics send email and sms message");
}
// 接收消息
@RabbitListener(bindings = @QueueBinding(value =
@Queue("topic_queue_email"), exchange =
@Exchange(value = "topic_exchange", type = "topic"),
key = "info.#.sms.#"))
public void topicConsumerSms(String message) {
System.out.println("接收到短信订阅需求处理消息: " + message);
}
通过以上详细的步骤和代码示例,我们可以在Spring Boot项目中顺利集成RabbitMQ,并灵活运用多种工作模式满足不同的业务需求。希望本文能帮助大家更好地理解和掌握Spring Boot与RabbitMQ的整合步骤。