Spring Boot如何整合RabbitMQ?订阅、路由、通配符3种工作模式的实现方式

后端 潘老师 4周前 (03-28) 64 ℃ (0) 扫码查看

RabbitMQ作为一款功能强大的消息队列中间件,与Spring Boot框架的整合相当常见。本文将详细介绍如何在Spring Boot项目中集成RabbitMQ,并深入探讨发布订阅、路由、通配符等多种工作模式的实现方式。

一、添加依赖与配置连接信息

在开始整合之前,首先要在Spring Boot项目的pom.xml文件中添加RabbitMQ相关依赖,引入spring-boot-starter-amqp依赖后,项目便能使用RabbitMQ的各项功能。具体依赖配置如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

接着,在application.properties文件里配置RabbitMQ的连接信息,包括服务器地址、端口、用户名、密码、虚拟主机等。同时,还可以设置一些与消息处理相关的参数:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 配置消息确认模式为简单模式,消息成功处理后,RabbitMQ会发送确认信号
spring.rabbitmq.publisher-confirm-type=simple 
# 配置监听器的最小并发数,启动时至少有3个消费者线程处理消息
spring.rabbitmq.listener.simple.concurrency=3 
# 配置监听器的最大并发数,负载高时最多启动10个消费者线程
spring.rabbitmq.listener.simple.max-concurrency=10 

二、发布订阅模式(Publish/Subscribe)

(一)模式原理

发布订阅模式下,需配置一个fanout类型的交换器(Exchange)。这种交换器的特点是不依赖路由键(Routing key),它会将接收到的消息广播到与其绑定的每一个消息队列上。每个消息队列都能接收并存储相同的消息,然后由各自关联的消费者进行消费。

(二)代码实现

  1. 定义交换机、队列与绑定关系
    可以借助AmqpAdmin管理类来定义交换机、队列以及它们之间的绑定关系。以下是示例代码:
package com.xyu;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;

@SpringBootTest
class SpringbootXyuProjectApplicationTests {
    @Autowired
    public AmqpAdmin amqpAdmin;

    @Test
    void contextLoads() {
    }

    @Test
    public void amqpAdmin() {
        // 定义一个fanout类型的交换机
        amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange")); 
        // 声明两个队列
        amqpAdmin.declareQueue(new Queue("fanout_queue_email")); 
        amqpAdmin.declareQueue(new Queue("fanout_queue_sms")); 
        // 将队列与交换机进行绑定
        amqpAdmin.declareBinding(new Binding(
                "fanout_queue_email",
                Binding.DestinationType.QUEUE,
                "fanout_exchange",
                "",
                null
        ));
        amqpAdmin.declareBinding(new Binding(
                "fanout_queue_sms",
                Binding.DestinationType.QUEUE,
                "fanout_exchange",
                "",
                null
        ));
    }
}
  1. 消息发送
    创建实体类并进行序列化处理,使用RabbitTemplateconvertAndSend方法发送消息到指定的交换机。示例代码如下:
// 定义实体类User
package com.xyu.po;
public class User {
    private String name;
    private Integer age;

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}
// 配置消息转换器,这里使用Jackson2JsonMessageConverter将Java对象转换为JSON字符串
package com.xyu.Config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
// 发送消息
@Test
public void Publisher() {
    User user = new User();
    user.setAge(18);
    user.setName("天天");
    // 将消息发送到fanout_exchange交换机,路由键为空
    rabbitTemplate.convertAndSend("fanout_exchange", "", user); 
}
  1. 消息监听
    使用@RabbitListener注解标记消息接收方法,Spring Boot会自动监听指定队列中的消息并调用相应方法进行处理。示例代码如下:
package com.xyu.Service;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMqListenerService {
    @RabbitListener(queues = "fanout_queue_email")
    public void psubConsumerEmail(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("邮件业务接收到消息: " + s);
    }

    @RabbitListener(queues = "fanout_queue_sms")
    public void psubConsumerSms(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("短信业务接收到消息: " + s);
    }
}

(三)基于配置类的发布订阅模式实现

除了上述方式,还可以通过配置类来创建交换机、队列和绑定关系,同时启用RabbitMQ的注解功能。示例代码如下:

package com.xyu.Config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
// @EnableRabbit注解用于启用基于注解的RabbitMQ消息处理功能
public class RabbitMQConfig {
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("exchange_my");
    }

    @Bean
    public Queue smsQueue() {
        return new Queue("fanout_queue_tem", true); // 队列持久化
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("fanout_queue_num", true); // 队列持久化
    }

    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
// 接收消息
@RabbitListener(bindings = @QueueBinding(value =
        @Queue("fanout_queue_tem"), exchange =
        @Exchange(value = "exchange_my", type = "fanout")))
public void psubConsumerEmailAno(User user) {
    System.out.println("1邮件业务接收到消息: " + user);
}

@RabbitListener(bindings = @QueueBinding(value =
        @Queue("fanout_queue_num"), exchange =
        @Exchange(value = "exchange_my", type = "fanout")))
public void psubConsumerSmsAno(User user) {
    System.out.println("1短信业务接收到消息: " + user);
}
// 发送消息
@Test
public void Publisher() {
    User user = new User();
    user.setAge(18);
    user.setName("天天1");
    // 向两个交换机发送相同消息
    rabbitTemplate.convertAndSend("fanout_exchange", "", user); 
    rabbitTemplate.convertAndSend("exchange_my", "", user);
}

三、路由模式(Routing)

(一)模式原理

路由模式中,需配置一个direct类型的交换器,并为不同的消息指定特定的路由键。交换器会根据路由键将消息准确地路由到与之匹配的消息队列中,消费者从各自对应的队列中获取消息进行消费。

(二)代码实现

  1. 定义交换机、队列与绑定关系
// 定义direct类型的交换机、队列和绑定关系
@Bean
public DirectExchange directExchange() {
    return new DirectExchange("routing_exchange");
}

@Bean
public Queue routeQueue() {
    return new Queue("routing_queue_all", true);
}

@Bean
public Binding routeBinding() {
    return BindingBuilder.bind(routeQueue()).to(directExchange()).withQueueName();
}
  1. 消息发送与接收
// 发送消息
@Test
public void routingPublisher() {
    rabbitTemplate.convertAndSend("routing_exchange",
            "error_routing_key",
            "routing send error message");
}
// 接收消息
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(name = "routing_queue_all"),
                exchange = @Exchange(value = "routing_exchange", type = "direct"),
                key = "error_routing_key")
)
public void routingConsumerError(String message) {
    System.out.println("接收到error级别日志消息: " + message);
}

四、通配符模式(Topics)

(一)模式原理

通配符模式同样使用topic类型的交换器,它在路由键匹配上更加灵活,支持使用通配符(#*)。#代表零个或多个单词,*代表一个单词,通过这种方式可以实现更细粒度的消息路由。

(二)代码实现

  1. 定义交换机、队列与绑定关系
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topic_exchange");
}

@Bean
public Queue topic_queue_email() {
    return new Queue("topic_queue_email", true);
}

@Bean
public BindingBuilder.TopicExchangeRoutingKeyConfigurer topicBinding() {
    return BindingBuilder.bind(topic_queue_email()).to(topicExchange());
}
  1. 消息发送与接收
// 发送消息
@Test
public void topicPublisher() {
    rabbitTemplate.convertAndSend("topic_exchange",
            "info.email",
            "topics send email message");
    rabbitTemplate.convertAndSend("topic_exchange",
            "info.sms",
            "topics send sms message");
    rabbitTemplate.convertAndSend("topic_exchange",
            "info.email.sms",
            "topics send email and sms message");
}
// 接收消息
@RabbitListener(bindings = @QueueBinding(value =
        @Queue("topic_queue_email"), exchange =
        @Exchange(value = "topic_exchange", type = "topic"),
        key = "info.#.sms.#"))
public void topicConsumerSms(String message) {
    System.out.println("接收到短信订阅需求处理消息: " + message);
}

通过以上详细的步骤和代码示例,我们可以在Spring Boot项目中顺利集成RabbitMQ,并灵活运用多种工作模式满足不同的业务需求。希望本文能帮助大家更好地理解和掌握Spring Boot与RabbitMQ的整合步骤。


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/back/16408.html
喜欢 (0)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】