章
目
录
消息中间件(如Kafka、RocketMQ等)普通消息有时候存在乱序的情况,那么,当MQ出现消息乱序时,该如何解决呢?
一、消息乱序的产生及影响
消息乱序是指消息在传递和处理过程中,出现顺序与预期不符的现象。例如在一次下单过程中,按照正常逻辑,支付消息应先于发货消息处理,因为只有完成支付才会进行发货操作。但在实际情况中,像虚拟商品业务场景下,若存在网络延迟,就可能导致发货消息先于支付消息被投递,这就是典型的消息乱序。
消息乱序会给系统处理带来一系列问题。比如,在系统处理过程中,如果A消息还未处理,B消息就先被处理,可能会导致处理失败;或者直接处理成功了B消息,等A消息到来时却无法处理。这些情况都会影响系统的正常运行,所以解决消息乱序问题至关重要。
二、解决消息乱序的常见方法
(一)使用顺序消息
对于有明确顺序要求的消息,如支付消息和发货消息,可以采用顺序消息的方式进行发送。具体做法是将这些消息按照顺序投递到同一个partition(队列)上,利用分区的顺序性来保证消息的顺序投递。同时,为了完全避免消息乱序,还需要确保只有一个消费者进行串行消费。这样,消息就能按照发送的顺序依次被处理,从而避免乱序问题。
(二)前置状态判断
我们可以在消息体中增加一个前置状态信息,例如beforeStatus
。以消费支付消息和发货消息的系统为例,系统可以根据这个beforeStatus
来判断与当前系统状态是否一致。若一致,说明可以处理该消息;若不一致,则表明所需的前置消息还未到达,此时可将该消息处理失败,让MQ下次重新投递。
不过,采用这种方案有两个必要条件:
- 消息要具备推进单据状态的能力。比如支付消息能够将订单状态从待支付推进到已支付。
- 消息的状态必须是单向变化的,不能出现从待支付推进到已支付后,又变回待支付的情况。
通过这种方式,利用状态判断来确保消息的有序处理。
(三)增加序列号
如果无法保证消息发送和处理的顺序,且没有状态用于前置判断时,可以在消息中引入序列号。具体操作如下:
- 在消息中附加一个递增的序列号,为每个消息赋予一个唯一的顺序标识。
- 消费端接收到消息后,使用缓冲区缓存消息,并根据序列号对消息进行重新排序,之后再进行处理。
需要注意的是,这种方法虽然能解决消息乱序问题,但会增加系统的复杂度。同时,为了处理可能丢失的消息,还需要设置缓存超时时间。
(四)自行实现排序
这是一种对上述第二种和第三种方案进行优化的方法。
第二种方案依赖MQ的重新投递,存在消息丢失的风险,因为长时间无法消费的消息可能不再重投。而且,频繁的重试会导致消息堆积,给系统造成压力。第三种方案则需要在内存中维护一个队列来进行排序,实现起来较为麻烦。
自行实现排序的流程如下:
- 接收到消息后,进行基本的前置校验,如检查消息的幂等性、参数是否齐全等。若校验不通过,直接返回失败。
- 消息校验成功后,将消息体转换为自定义的内部事件,以便后续解析和处理。
- 将这个事件存储到数据库中,并将状态设置为待处理。
- 若数据库保存成功,立即返回消息处理成功。
- 在返回之前,开启异步线程处理该事件。若处理成功,将消息状态设置为已处理;若处理失败,可以选择不修改消息状态,或者将其改为失败状态,并增加执行次数。
- 启动一个异步任务,定时扫描事件表中未成功处理的事件进行重试。
通过这种方式,确保所有事件都能被存储,存储后立即返回,避免消息重投和堆积。基于存储的消息,还能进行排序和重试操作。若某个消息处理失败,可不断重试,达到一定次数后报警,由人工跟进处理。
三、进一步优化
为了减少上述方案中定时任务带来的延迟,可以借助Redis进行优化。在将消息写入消息表时,以业务单号(如订单号)作为key,将存入消息的主键id作为value,存储到Redis中。
当有新消息到来时,先正常处理。若处理成功,去Redis中查询是否存在相同业务单号的待处理消息。若存在,根据存储的主键id查询对应的事件,并放入线程池中进行处理。这样可以更及时地处理相关消息,提升系统的整体性能。
通过以上几种方法及其优化策略,可以有效地解决MQ消息乱序问题,在实际应用中,我们可以根据具体的业务场景和需求,选择合适的解决方案。