如何解决MQ消息乱序问题

后端 潘老师 1周前 (04-14) 13 ℃ (0) 扫码查看

消息中间件(如Kafka、RocketMQ等)普通消息有时候存在乱序的情况,那么,当MQ出现消息乱序时,该如何解决呢?

一、消息乱序的产生及影响

消息乱序是指消息在传递和处理过程中,出现顺序与预期不符的现象。例如在一次下单过程中,按照正常逻辑,支付消息应先于发货消息处理,因为只有完成支付才会进行发货操作。但在实际情况中,像虚拟商品业务场景下,若存在网络延迟,就可能导致发货消息先于支付消息被投递,这就是典型的消息乱序。

消息乱序会给系统处理带来一系列问题。比如,在系统处理过程中,如果A消息还未处理,B消息就先被处理,可能会导致处理失败;或者直接处理成功了B消息,等A消息到来时却无法处理。这些情况都会影响系统的正常运行,所以解决消息乱序问题至关重要。

二、解决消息乱序的常见方法

(一)使用顺序消息

对于有明确顺序要求的消息,如支付消息和发货消息,可以采用顺序消息的方式进行发送。具体做法是将这些消息按照顺序投递到同一个partition(队列)上,利用分区的顺序性来保证消息的顺序投递。同时,为了完全避免消息乱序,还需要确保只有一个消费者进行串行消费。这样,消息就能按照发送的顺序依次被处理,从而避免乱序问题。

(二)前置状态判断

我们可以在消息体中增加一个前置状态信息,例如beforeStatus。以消费支付消息和发货消息的系统为例,系统可以根据这个beforeStatus来判断与当前系统状态是否一致。若一致,说明可以处理该消息;若不一致,则表明所需的前置消息还未到达,此时可将该消息处理失败,让MQ下次重新投递。

不过,采用这种方案有两个必要条件:

  • 消息要具备推进单据状态的能力。比如支付消息能够将订单状态从待支付推进到已支付。
  • 消息的状态必须是单向变化的,不能出现从待支付推进到已支付后,又变回待支付的情况。

通过这种方式,利用状态判断来确保消息的有序处理。

(三)增加序列号

如果无法保证消息发送和处理的顺序,且没有状态用于前置判断时,可以在消息中引入序列号。具体操作如下:

  • 在消息中附加一个递增的序列号,为每个消息赋予一个唯一的顺序标识。
  • 消费端接收到消息后,使用缓冲区缓存消息,并根据序列号对消息进行重新排序,之后再进行处理。

需要注意的是,这种方法虽然能解决消息乱序问题,但会增加系统的复杂度。同时,为了处理可能丢失的消息,还需要设置缓存超时时间。

(四)自行实现排序

这是一种对上述第二种和第三种方案进行优化的方法。

第二种方案依赖MQ的重新投递,存在消息丢失的风险,因为长时间无法消费的消息可能不再重投。而且,频繁的重试会导致消息堆积,给系统造成压力。第三种方案则需要在内存中维护一个队列来进行排序,实现起来较为麻烦。

自行实现排序的流程如下:

  • 接收到消息后,进行基本的前置校验,如检查消息的幂等性、参数是否齐全等。若校验不通过,直接返回失败。
  • 消息校验成功后,将消息体转换为自定义的内部事件,以便后续解析和处理。
  • 将这个事件存储到数据库中,并将状态设置为待处理。
  • 若数据库保存成功,立即返回消息处理成功。
  • 在返回之前,开启异步线程处理该事件。若处理成功,将消息状态设置为已处理;若处理失败,可以选择不修改消息状态,或者将其改为失败状态,并增加执行次数。
  • 启动一个异步任务,定时扫描事件表中未成功处理的事件进行重试。

通过这种方式,确保所有事件都能被存储,存储后立即返回,避免消息重投和堆积。基于存储的消息,还能进行排序和重试操作。若某个消息处理失败,可不断重试,达到一定次数后报警,由人工跟进处理。

三、进一步优化

为了减少上述方案中定时任务带来的延迟,可以借助Redis进行优化。在将消息写入消息表时,以业务单号(如订单号)作为key,将存入消息的主键id作为value,存储到Redis中。

当有新消息到来时,先正常处理。若处理成功,去Redis中查询是否存在相同业务单号的待处理消息。若存在,根据存储的主键id查询对应的事件,并放入线程池中进行处理。这样可以更及时地处理相关消息,提升系统的整体性能。

通过以上几种方法及其优化策略,可以有效地解决MQ消息乱序问题,在实际应用中,我们可以根据具体的业务场景和需求,选择合适的解决方案。


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/back/17234.html
喜欢 (0)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】