消息队列是一个我们经常使用的中间件,那么我们在使用它的时候经常会被问到一些问题,比如说消息队列如何处理重复消息,消息队列如何防止消息丢失等等内容,那么照片文章就来看一下如何处理消息队列中的重复消费消息的问题。
假设有这样一个场景:“在我们的电商系统中,订单创建后会发送一条消息,下游的优惠券兑换系统会订阅这个消息,然后发放优惠券。我们的系统需要确保每一张优惠券,无论网络如何波动、系统如何异常,都只能被成功兑换一次。你会如何设计呢?”
这个问题看似简单,但它背后考验的是工程师对分布式系统复杂性的理解,尤其是对“幂等性”这一核心概念的掌握程度。很多同学的第一反应可能是“消息队列不是有‘exactly-once’(精确一次)的保证吗?”。但事实上,绝对的“精确一次”在分布式系统中是一个难以达到的理想状态,都需要业务方配合于重试和幂等来达成。
为什么消息会重复
我们在分析这个问题的时候可以拆分为消息队列的几个过程,分别是生产者将消息发送到broker的过程以及消费者从broker中获取数据并消费的过程
我们可以依次来分析
- 生产者重复发送:因为网络超时等原因没有收到broker的通知,但此时broker已经接收到了消息,这时生产者使用重试机制重复发送消息。
- 消费者重复消费:这个过程是消费者在从broker中获取到消息之后,它执行完消息对应的操作了,但是还没有提交消息,这时消费者宕机了,然后在消费者重启过来之后由于没有提交偏移量,导致重复消费。
对于生产者重复消费的问题我们并不需要过多的担心,因为我们的kafka本身就有幂等性的约束,它是这么实现的,Kafka 的发送幂等是通过序列号来实现的,每个消息都会被分配一个序列号,序列号是递增的,这样就可以保证消息的顺序性。当生产者发送消息时,会将消息的序列号和消息内容一起写入到日志文件中,下次收到非预期序列号的消息就会返回 OutOfOrderSequenceException 异常。
那么我们主要解决的就是消费者的重复消费问题了,那消费者这么保重永不重复呢,其实保证不了,我们能做的就是让我们的消费端服务具备幂等处理消息的能力。所谓幂等,就是一个请求无论被重复执行多少遍,其系统状态产生的相应都与第一次执行时完全相同。
那如何实现幂等操作呢?最好的方式就是从业务逻辑设计上入手,将香妃的业务逻辑设计成鱼油幂等性的操作。
实现方法
数据库唯一id
这是最简单、最直接,也是最常用的一种方案。其核心思想是,利用数据库中“唯一索引”或“主键”的特性,来阻挡重复数据的插入。
假设我们有一个电商系统,用户下单后会发送一条消息,触发给用户增加积分的操作。消息内容可能包含{ "order_id": "202508310001", "user_id": 58, "points_to_add": 100 }。
这个“增加积分”的操作,天然是非幂等的。我们可以这样改造:
- 建立一张积分流水表(points_log)。
- 表中包含字段:
id(自增主键),order_id(订单ID),user_id(用户ID),points(变更积分),create_time。 - 关键一步: 对
order_id这个字段建立一个唯一索引。
-- 尝试插入积分流水记录
-- 假设 order_id 字段上有唯一索引
INSERT INTO points_log (order_id, user_id, points) VALUES ('202508310001', 58, 100);
- 第一次消费:该订单ID首次出现,
INSERT操作成功。然后我们可以安全地去更新用户的总积分。 - 重复消费:MQ再次投递相同的消息,消费者尝试
INSERT时,数据库会因为order_id的唯一索引冲突而直接报错。我们的代码捕获这个异常后,就可以知道这是重复操作,直接忽略并返回ACK即可。
这是一种最简单的实现情况,面试的时候,为了展现你的思考能力,还可以做一个适当延伸,说明下这种方案的优缺点,以及扩展性
这种方案的优点是: 实现简单,成本低,效果可靠。 缺点也很明显: 强依赖数据库特性,对于非数据库操作的场景无能为力。
基于这个思路,如果不用关系型数据库,Redis的
SETNX命令(SET if Not eXists)也能达到异曲同工的效果,可以用order_id作为key,实现分布式锁或状态记录。
版本号机制
对于数据插入操作,使用唯一索引是可行的,但是如果我们不是插入操作而是更新操作呢,这个时候又要怎么保证消息的唯一性呢?
这时我们可以引入‘’版本号‘‘机制,这也就是经常说的乐观锁。
我们可以在消费消息的时候带上一个version,默认是0或者1,然后把他写入到消息当中,这时我们如果要执行更新操作,先从数据库中的到version列,然后比对version和消息中的version是否一致,如果一致,就更新消息并且将version+1,如果不一致,那就不更新
这个优点:就是使用的范围相比于唯一性约束要广很多,
不足:
首先它增加了一个消息列这可能会导致带宽增加,其次它更改了mysql的数据列数,这有一定的侵入性,并且加重了消费者的负担,因为消费者在更新数据的时候会先将version从数据库中查询出来,然后再更新。
亮点方案
全局唯一ID+单独的防重表
放重表也叫幂等记录表,这个方案的核心思想就是为每一个消息操作生成一个全局唯一的ID,然后再消费者执行消息之前将其插入到放重表当中,如果插入成功,证明这个消息是第一次被操作,如果插入失败就证明已经被消费过了直接丢弃就好。
如果业务复杂,可以采用防重表的方案,将业务逻辑和幂等逻辑解耦。单独建立一张防重表,具体的步骤如下:
- 为每条消息生成一个全局唯一ID(GUID)。这个ID可以在生产者发送时就放入消息体或Header中。
- 建立一张“消费记录表”(consumed_log),表结构很简单,核心就是一个字段
message_id,并将其设为主键或唯一索引。 - 消费者处理逻辑变为一个“三段式”:
- a. 开启事务。
- b.
INSERT消息的GUID到consumed_log表中。 - c. 执行真正的业务逻辑(更新数据库、调用RPC等)。
- d. 提交事务。
这样如果是重复消息的话,就会插入消费记录表失败,就不会执行后面的业务逻辑了
那这个有没有问题呢:如果没有事物怎么办,比如说在分布式环境下跨库要怎么处理
异步校对
因为在微服务框架下,业务操作往往是跨域的,比如“扣减库存”和“创造物流订单”分别由两个不同的服务实现,这时本地事物就失效了,这时怎么办呢。
我们可以采用一个异步校对的机制,整个流程分为一下三步:
- 预操作:收到消息之后第一步操作是向数据库中插入一个id和PROCESSING,这是幂等性的关键防线。
- 执行业务:然后调用库存、物流等下游服务,执行核心业务逻辑。
- 确认操作:当所有的业务逻辑都执行成功之后会讲表中的状态改为”已完成“,这时如果再有数据来的话就不会被执行
那么这里就有几种失败的情况,第一种是插入id和PROCESSING都失败了,这种情况其实并不影响我们的一致性,因为什么都没发生,我们也没有提交偏移量,那么事件来的时候我们再次执行就好了,这是一种,但是如果多次失败我们就要记录下来进行人工处理了。
还一种是任务执行成功了,但是确认操作失败了,这时我们的数据库中规定状态还是PROCESSING状态,那么就是我们的异步校对起作用的地方了,我们的异步校对会查看长期处于PROCESSING的数据,然后到下游去看是否执行成功,如果执行成功就会把他的状态更新为SUCESS。
那最后就是如果任务也执行失败了呢,如果任务也执行失败了,我们的后台进程会讲执行状态标记为false,这时来消息的时候重新执行就好了,而且这里可以有一个重复次数,如果超过了重复的次数,我们就要人工介入了。
redis, 布隆过滤器加速
因为上面的都是要通过数据库的,所以速度上可能比较慢,所以我们可以有一个加速策略就是通过redis来进行加速,讲已经成功执行的id存入redis,这时如果有重复的任务到达就可以将其阻拦在外了。
当然如果也可以使用布隆过率器进行筛选,如果有可能存在假阳性问题,要重新到数据库校验,如果没有那就是新的业务。
Comments NOTHING