消息队列死信策略详解及相关基础概念

后端 潘老师 1周前 (04-16) 11 ℃ (0) 扫码查看

现代分布式系统里,消息队列是个极为关键的组件,它能实现系统之间的异步通信,还能起到解耦的作用。本文会用通俗易懂的语言,为大家介绍消息队列中的死信策略,同时拓展相关基础知识,文中还配有示例代码,方便国内开发者学习和理解。

一、死信消息与死信队列的概念

(一)死信消息

死信消息,简单来说,就是那些消费者没办法正常处理的消息。像消息格式不对、数据出现异常、消费失败即便重试多次也没成功,或者消息过了有效期等情况,这些消息就会成为死信消息。

(二)死信队列

死信队列(Dead Letter Queue,简称DLQ),是专门用来存放死信消息的队列。一旦死信消息产生,就会被转移到死信队列里,这样就不会影响正常的业务流程了。

二、死信消息产生的常见原因

  1. 消息被拒绝:当消费者调用拒绝接口,并且选择不把消息重新放回队列时,这条消息就会变成死信消息。
  2. 消息过期:如果消息设置了存活时间(TTL),在这个时间内一直没被消费,时间一到,消息就过期了,也就成了死信消息。
  3. 队列满了:队列都有最大长度限制,当达到这个上限后,新的消息就无法再进入队列,这些无法入队的消息也可能成为死信消息。
  4. 消息格式或内容异常:消息的格式或者内容出现问题,导致消费者没办法处理,这样的消息也会变成死信消息。

三、死信策略的核心内容

  1. 消息转为死信条件:常见的情况有消费失败且重试次数达到上限、被消费者拒绝且不重试、消息过期、消息格式异常等。
  2. 死信消息处理方式:死信消息不会再被正常的消费者消费,而是会被转移到死信队列中,这样后续可以进行人工处理或者通过程序自动处理。
  3. 死信消息保存时间:一般来说,死信消息的保存时间和正常消息是一样的,比如设置为3天。而且消息进入死信队列后,保存时间会重新开始计算。
  4. 配置建议:为了避免死信消息循环,死信队列最好和业务队列分开设置。同时,要注意监控死信消息的相关指标,这样便于及时发现问题。

四、死信队列的作用

  1. 提高系统稳定性:死信队列可以把异常消息隔离出去,防止它们阻塞正常的业务流程,让系统运行更加稳定。
  2. 方便问题排查:所有的异常消息都集中存放在死信队列里,方便开发人员去分析问题,还能对出现问题的业务进行补偿操作。
  3. 保证消息完整性:通过死信队列,避免了消息丢失的情况,保证了业务数据的一致性。

五、死信队列的简单实现示例(RabbitMQ)

下面基于RabbitMQ,给大家展示一个死信队列的配置和使用示例,帮助大家理解它的工作流程。

import com.rabbitmq.client.*;

public class DeadLetterExample {
    // 定义业务交换机名称
    private static final String EXCHANGE_NAME = "business_exchange";
    // 定义业务队列名称
    private static final String QUEUE_NAME = "business_queue";
    // 定义死信交换机名称
    private static final String DLX_EXCHANGE = "dead_letter_exchange";
    // 定义死信队列名称
    private static final String DLQ_NAME = "dead_letter_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明死信交换机和死信队列
            channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(DLQ_NAME, true, false, false, null);
            channel.queueBind(DLQ_NAME, DLX_EXCHANGE, "dlx_routing_key");

            // 业务队列参数,绑定死信交换机和路由键
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put("x-dead-letter-exchange", DLX_EXCHANGE);
            argsMap.put("x-dead-letter-routing-key", "dlx_routing_key");
            argsMap.put("x-message-ttl", 10000); // 消息10秒后过期

            // 声明业务交换机和业务队列
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "business_key");

            // 发送一条消息
            String message = "Hello, RabbitMQ with DLQ!";
            channel.basicPublish(EXCHANGE_NAME, "business_key", null, message.getBytes("UTF-8"));
            System.out.println("Sent message: " + message);

            // 消费业务队列,模拟拒绝消息,触发死信
            channel.basicConsume(QUEUE_NAME, false, (consumerTag, delivery) -> {
                String msg = new String(delivery.getBody(), "UTF-8");
                System.out.println("Received message: " + msg);
                // 拒绝消息且不重新入队,消息进入死信队列
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            }, consumerTag -> {});

            // 消费死信队列
            channel.basicConsume(DLQ_NAME, true, (consumerTag, delivery) -> {
                String deadMsg = new String(delivery.getBody(), "UTF-8");
                System.out.println("Dead letter received: " + deadMsg);
            }, consumerTag -> {});

            // 保持程序运行,观察输出
            Thread.sleep(20000);
        }
    }
}

代码说明

  • 业务队列设置了x-dead-letter-exchangex-dead-letter-routing-key,这样当消息被拒绝或者过期时,就会自动转发到死信交换机,再由死信交换机根据路由键把消息路由到死信队列。
  • 消费者在处理业务队列消息时,调用basicReject方法拒绝消息,并且第二个参数设置为false,表示不把消息重新放回队列,这样消息就会进入死信队列。
  • 通过消费死信队列,可以查看进入死信队列的消息,方便后续处理。

六、消息队列的其他基础知识

(一)消息

消息是传递数据的载体,它不仅包含实际的数据内容(消息体),还带有一些属性。

(二)主题

主题是消息的分类标识。生产者把消息发送到特定的主题,消费者通过订阅感兴趣的主题来获取消息。

(三)队列

队列是用来存储消息的容器,一般按照先进先出(FIFO)的顺序处理消息。

(四)消费模式

  1. 集群消费:这种模式下,多个消费者共同处理消息,实现负载均衡,提高消费效率。
  2. 广播消费:所有的消费者都会接收到每一条消息。

(五)消息模型

  1. 点对点模型:一个消息只会被一个消费者消费。
  2. 发布/订阅模型:多个消费者可以订阅同一个消息,都能接收到这条消息。

(六)可靠性保障

  1. 消息确认(ACK):消费者处理完消息后,向消息队列发送确认消息,确保消息不会丢失。
  2. 重试机制:当消息消费失败时,会按照一定的策略进行重试。
  3. 顺序消息保证:确保消息按照特定的顺序被消费。

(七)异步处理与削峰

生产者发送消息后不需要等待处理结果,直接返回。在流量高峰时,消息队列可以缓冲大量消息,平滑系统负载。

(八)延时和优先级

  1. 延时消息:可以设置消息在指定时间后才被消费。
  2. 优先级消息:重要的消息可以设置更高的优先级,优先被处理。

(九)性能监控与优化

通过监控消息队列的响应时间、传输速率、积压数量等指标,采取增加消费者、批量处理消息等方式,对消息队列的性能进行优化。

(十)流控与幂等性

  1. 流控:防止消费者因为消息过多而导致过载。
  2. 幂等性:保证消息无论被消费多少次,对业务的影响都是一样的,避免重复消费带来的问题,确保业务的一致性。

(十一)架构设计

采用分布式部署和多级队列设计,可以提升消息队列的容错能力和扩展能力。

七、典型死信策略配置示例(RabbitMQ)

# 声明死信交换机
rabbitmqadmin declare exchange name=dlx_exchange type=direct durable=true

# 声明死信队列
rabbitmqadmin declare queue name=dlq durable=true

# 绑定死信队列到死信交换机
rabbitmqadmin declare binding source=dlx_exchange destination=dlq routing_key=dlx_key

# 声明业务队列,绑定死信交换机和路由键
rabbitmqadmin declare queue name=business_queue durable=true arguments='{"x-dead-letter-exchange":"dlx_exchange","x-dead-letter-routing-key":"dlx_key","x-message-ttl":60000}'

# 绑定业务队列到业务交换机
rabbitmqadmin declare binding source=business_exchange destination=business_queue routing_key=business_key

上述命令分别完成了声明死信交换机、死信队列,绑定死信队列到死信交换机,声明业务队列并绑定死信交换机和路由键,以及绑定业务队列到业务交换机的操作。

八、总结

死信策略是消息队列处理异常消息的重要手段,能把无法正常消费的消息隔离到死信队列,保证正常业务不受影响。死信消息产生的原因多样,通过合理配置死信交换机和死信队列,系统可以自动转移死信消息,便于后续处理。监控死信消息指标,对于保障消息系统的稳定运行至关重要。除了死信策略,消息队列还有很多其他的基础知识点,如消息模型、消费模式等。希望通过本文的介绍和示例代码,能帮助大家快速掌握死信策略的核心概念和实现方法。


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/back/17376.html
喜欢 (0)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】