章
目
录
RocketMQ作为一款高性能的消息队列,和Kafka类似,它支持基于队列(分区)的顺序消费,即同一队列内的消息能保证有序,不同队列间的消息则无序。下面,咱们就深入探讨下在RocketMQ中如何保证消息的顺序性。
一、消息发送端保障顺序性
当我们以RocketMQ生产者的身份发送顺序消息时,需要在send
方法里传入一个MessageQueueSelector
。这个MessageQueueSelector
里有个select
方法,它的作用是决定消息该被发送到哪个MessageQueue
。通常,我们会用取模法来进行路由,具体代码如下:
SendResult sendResult = producer.send(
msg,
new MessageQueueSelector() {
@Override
// mqs:该Topic下所有可选的MessageQueue
// msg:待发送的消息
// arg:发送消息时传递的参数
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
//根据参数,计算出一个要接收消息的MessageQueue的下标
int index = id % mqs.size();
//返回这个MessageQueue
return mqs.get(index);
}
},
orderId);
这段代码里,arg
参数会被转化为整数id
,然后通过id
对mqs
(该Topic下所有可选的MessageQueue
列表)的大小取模,得到一个下标,进而确定消息要发送到的MessageQueue
。需要注意的是,这里建议使用同步发送的方式,这样才能保证消息按顺序发送到指定队列。
二、消息消费端保障顺序性
消息按顺序发送到消息队列后,消费者要如何按发送顺序消费呢?RocketMQ的MessageListener
回调函数提供了两种消费模式:有序消费模式MessageListenerOrderly
和并发消费模式MessageListenerConcurrently
。若要实现顺序消费,得采用有序消费模式,示例代码如下:
consumer.registerMessageListener(
new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage (List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.printf("Receive order msg:" + new String(msgs.get(0).getBody()));
return ConsumeOrderlyStatus.SUCCESS;
}
});
当我们用这种方式注册一个消费者后,为确保同一队列中的有序消息能顺序消费,还得保证RocketMQ的Broker只会把消息发送到同一个消费者上,这就需要加锁了。
在ConsumeMessageOrderlyService
初始化时,会启动一个定时任务,该任务会尝试向Broker为当前消费者客户端申请分布式锁。如果成功获取锁,后续消息就会只发给这个Consumer。
消息拉取过程中,消费者会一次性拉取多条消息,并将这些消息放入ProcessQueue
,同时提交到消费线程池执行。那拉取后的消费过程如何保证顺序性呢?这就涉及到更多的锁机制了。
RocketMQ在消费时,需要申请MessageQueue
锁,确保同一时间只有一个线程能处理队列中的消息。获取MessageQueue
锁后,就可以从ProcessQueue
中依次拉取一批消息处理。但在这个过程中,为避免消息重复消费,还得对ProcessQueue
加锁,这部分内容在后面扩展知识中会详细展开。之后,就可以开始处理业务逻辑了。
总的来说,为保障消息顺序性,RocketMQ在消费过程中进行了三次加锁:首先锁定Broker上的MessageQueue
,确保消息只投递到唯一的消费者;接着对本地的MessageQueue
加锁,保证只有一个线程能处理该消息队列;最后对存储消息的ProcessQueue
加锁,防止在重平衡过程中出现消息重复消费。
三、加锁机制的扩展知识:第三把锁的作用
前面提到客户端加锁过程中有三把锁,可能有人会疑惑:第三把锁如果不加,是不是也没问题呢?毕竟我们已经对MessageQueue
加锁了,为什么还需要对ProcessQueue
再次加锁?
这主要是考虑到重平衡的情况。当消费者集群新增消费者,发生重平衡时,原本由客户端A消费的某个队列,可能会重新分配给客户端B。这时,客户端A需要把加在Broker上的锁解掉。但在解锁过程中,要确保消息不会在消费过程中被移除。因为如果客户端A正在处理一部分消息,而位点信息还未提交,此时若客户端B立马去消费队列中的消息,就可能导致部分数据被重复消费。
那如何判断消息是否正在消费中呢?这就要通过ProcessQueue
上的锁来判断。也就是说,解锁的线程也需要尝试对ProcessQueue
加锁,只有加锁成功才能进行解锁操作,以此避免在解锁过程中有消息被消费。
四、顺序消费存在的问题
通过上述介绍可知,RocketMQ的顺序消费是通过在消费者上多次加锁实现的。这种方式虽然保证了消息顺序性,但也带来了一些问题。比如,加锁操作会降低系统的吞吐量,而且如果前面的消息出现阻塞,后续更多消息也会跟着阻塞。所以在实际应用中,顺序消息要谨慎使用,需根据具体业务场景权衡利弊。