RocketMQ消息如何实现顺序性:原理、实现与问题解析

后端 潘老师 1周前 (04-15) 19 ℃ (0) 扫码查看

RocketMQ作为一款高性能的消息队列,和Kafka类似,它支持基于队列(分区)的顺序消费,即同一队列内的消息能保证有序,不同队列间的消息则无序。下面,咱们就深入探讨下在RocketMQ中如何保证消息的顺序性。

一、消息发送端保障顺序性

当我们以RocketMQ生产者的身份发送顺序消息时,需要在send方法里传入一个MessageQueueSelector。这个MessageQueueSelector里有个select方法,它的作用是决定消息该被发送到哪个MessageQueue。通常,我们会用取模法来进行路由,具体代码如下:

SendResult sendResult = producer.send(
        msg,
        new MessageQueueSelector() {
            @Override
            // mqs:该Topic下所有可选的MessageQueue
            // msg:待发送的消息
            // arg:发送消息时传递的参数
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                //根据参数,计算出一个要接收消息的MessageQueue的下标
                int index = id % mqs.size();
                //返回这个MessageQueue
                return mqs.get(index);
            }
        },
        orderId);

这段代码里,arg参数会被转化为整数id,然后通过idmqs(该Topic下所有可选的MessageQueue列表)的大小取模,得到一个下标,进而确定消息要发送到的MessageQueue。需要注意的是,这里建议使用同步发送的方式,这样才能保证消息按顺序发送到指定队列。

二、消息消费端保障顺序性

消息按顺序发送到消息队列后,消费者要如何按发送顺序消费呢?RocketMQ的MessageListener回调函数提供了两种消费模式:有序消费模式MessageListenerOrderly和并发消费模式MessageListenerConcurrently。若要实现顺序消费,得采用有序消费模式,示例代码如下:

consumer.registerMessageListener(
    new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage (List<MessageExt> msgs, ConsumeOrderlyContext context) {
            System.out.printf("Receive order msg:" + new String(msgs.get(0).getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        }
});

当我们用这种方式注册一个消费者后,为确保同一队列中的有序消息能顺序消费,还得保证RocketMQ的Broker只会把消息发送到同一个消费者上,这就需要加锁了。

ConsumeMessageOrderlyService初始化时,会启动一个定时任务,该任务会尝试向Broker为当前消费者客户端申请分布式锁。如果成功获取锁,后续消息就会只发给这个Consumer。

消息拉取过程中,消费者会一次性拉取多条消息,并将这些消息放入ProcessQueue,同时提交到消费线程池执行。那拉取后的消费过程如何保证顺序性呢?这就涉及到更多的锁机制了。

RocketMQ在消费时,需要申请MessageQueue锁,确保同一时间只有一个线程能处理队列中的消息。获取MessageQueue锁后,就可以从ProcessQueue中依次拉取一批消息处理。但在这个过程中,为避免消息重复消费,还得对ProcessQueue加锁,这部分内容在后面扩展知识中会详细展开。之后,就可以开始处理业务逻辑了。

总的来说,为保障消息顺序性,RocketMQ在消费过程中进行了三次加锁:首先锁定Broker上的MessageQueue,确保消息只投递到唯一的消费者;接着对本地的MessageQueue加锁,保证只有一个线程能处理该消息队列;最后对存储消息的ProcessQueue加锁,防止在重平衡过程中出现消息重复消费。

三、加锁机制的扩展知识:第三把锁的作用

前面提到客户端加锁过程中有三把锁,可能有人会疑惑:第三把锁如果不加,是不是也没问题呢?毕竟我们已经对MessageQueue加锁了,为什么还需要对ProcessQueue再次加锁?

这主要是考虑到重平衡的情况。当消费者集群新增消费者,发生重平衡时,原本由客户端A消费的某个队列,可能会重新分配给客户端B。这时,客户端A需要把加在Broker上的锁解掉。但在解锁过程中,要确保消息不会在消费过程中被移除。因为如果客户端A正在处理一部分消息,而位点信息还未提交,此时若客户端B立马去消费队列中的消息,就可能导致部分数据被重复消费。

那如何判断消息是否正在消费中呢?这就要通过ProcessQueue上的锁来判断。也就是说,解锁的线程也需要尝试对ProcessQueue加锁,只有加锁成功才能进行解锁操作,以此避免在解锁过程中有消息被消费。

四、顺序消费存在的问题

通过上述介绍可知,RocketMQ的顺序消费是通过在消费者上多次加锁实现的。这种方式虽然保证了消息顺序性,但也带来了一些问题。比如,加锁操作会降低系统的吞吐量,而且如果前面的消息出现阻塞,后续更多消息也会跟着阻塞。所以在实际应用中,顺序消息要谨慎使用,需根据具体业务场景权衡利弊。


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/back/17293.html
喜欢 (0)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】