kafka如何处理消费挤压

spigcoder 发布于 2025-09-01 161 次阅读


看到这个问题不知道各位是不是第一时间想到的是增加消费者数量,这到底行不行呢,只能说有可能行。

这是因为kafka的partition策略,它保证同一个partition只能被一个消费者组的一个消费者消费,那么如果partition的数量多于消费者组中消费者的数量,那么增加消费者是可以的,但如果现在消费者组中消费者的数量已经等于甚至大雨partition的数量了,那么只是增加消费者的数量是没有意义的。

那么我们现在知道了分区数是消费能力的天花板,那么该如何设置分区呢

如何设置topic中的分区

既然分区数是消费能力的天花板,那么在Topic创建之初,科学地规划分区数量就显得至关重要。这是一种主动防御,能有效避免未来的很多麻烦。

那么,这个“合理”的分区数,到底该如何确定呢?

业界并没有一个放之四海而皆准的公式。最严谨的方式,当然是利用MQ自带的压测脚本(如Kafka的kafka-producer-perf-test.sh),在测试环境中模拟生产环境的消息大小、吞吐量,通过不断调整分区数和消费者线程数来找到最佳值。但现实是,很多团队没有这样的测试条件,或者不敢轻易在生产环境进行压测。

在这种情况下,我在这里分享一个在实践中总结的、简单有效的估算方法:

  1. 评估生产者峰值吞吐:首先预估业务高峰期,所有生产者写入消息的总速率。这需要和业务方充分沟通,了解未来的增长预期和活动规划。数据来源可以是历史监控数据(如Prometheus/Grafana中的指标)、业务数据分析报表等。假设峰值为5000条/秒。
  2. 评估单分区写入上限:通过压测或咨询运维团队,了解当前MQ集群下单分区的写入性能极限。这个值受限于Broker的磁盘I/O、网络带宽、副本同步策略(acks参数)、消息压缩方式等多种因素。假设是250条/秒。
  3. 评估单消费者处理能力:在不考虑任何优化的情况下,评估单个消费者实例处理消息的平均速率。这个速率的瓶颈通常不在于消费者本身,而在于其处理逻辑中涉及的外部依赖,比如数据库写入、RPC调用等。可以通过对消费逻辑进行性能剖析(Profiling)来精确测量。假设是100条/秒。

基于以上数据,我们可以计算出两个所需的分区数:

  • 满足生产需求的分区数 = 生产者峰值吞吐 / 单分区写入上限 = 5000 / 250 = 20个分区。
  • 满足消费需求的分区数 = 生产者峰值吞吐 / 单消费者处理能力 = 5000 / 100 = 50个分区。

为了确保生产者不被阻塞,且消费者能及时处理,我们应该取两者中的最大值,即50个分区。在此基础上,再增加一些冗余(比如10%~20%),最终确定为55或60个分区,以应对未来的业务增长和流量波动。

当你在面试中给出这个计算方法后,如果想让回答更上一层楼,就不能止步于此。面试官其实更想听到你对“权衡”的理解。你可以接着补充

“不过,分区数也并非越多越好。过多的分区会增加Broker元数据管理的开销和客户端的内存消耗。更重要的是,过多的分区会显著延长消费者组发生再均衡(Rebalance)的时间,在此期间整个消费组是停止消费的,反而可能加剧消息积压。所以,这是一个需要在吞吐能力和系统开销之间寻找平衡的决策。”

应对挤压的快速解决方法

面对消息挤压时我们首先应该做的是判断为什么会出现消息挤压

  • 突发流量所致:由于某个活动或突发事件导致的短暂流量高峰,消费者的处理能力本身是足够的。这种情况在电商领域是非常常见的,比如进行某个促销活动,往往都会伴随着短暂的流量高分,活动已过,流量又恢复正常。此时,我们可以通过监控指标(如消费滞后量Consumer Lag、生产速率、消费速率)来估算恢复时间。例如,积压了100万条,消费速率比生产速率快1000条/秒,那么大约需要 1,000,000 / 1000 = 1000秒 ≈ 17分钟 就能恢复。如果你的业务对17分钟的延迟完全可以接受,那么我们甚至可以不进行干预。如果持续时间相对较长,就可以考虑我们后续介绍的接种办法
  • 消费者能力不足:消费者的整体处理能力已经跟不上生产者的速度,积压量会持续增长,这时就必须采取行动了。这种情况往往可能是因为我们的业务代码有一些隐藏的故障,导致消费能力很弱,或者是随着时间的拉长,原本数据库里的表数据变得越来越大,此时在数据存储层的处理时间越来越久,拖慢了整个消费速度,这个时候就需要我们去进行一些慢SQL优化之类的工作了

接下来,我们就来看看,应对消费能力不足,并且在消费者数量已经等于分区数的前提下,要快速解决问题有哪些可行的方案?

扩容分区

最先想到应该就是增加分区,我们增加了分区就可以增加消费者组的消费者数量,那么不就提高了消费能力,是不是就可以解决消费挤压的问题,

但是扩容分区虽然简单但是同样出现问题,首先有的公司是不允许随便扩容分区的,因为扩容分区会增加broker的维护成本,并且扩容分区很可能会导致之前在kafka中的消息被分配到别的消费者从而造成消息顺序不一致的问题,这个方法在消息顺序上讲过。

创建新的topic

既然扩容分区可能会造成上面的影响,那么我们可以看能否新创建一个topic,并且新的topic的分区数大于我们估算的需要的分区数

让生产者向新的topic中写入数据,同时消费者新建一个消费者组,用于处理新topic中的数据,同时旧的消费者组处理原topic中的数据,当旧的消费者组数据消费完成之后下线旧的消费者,实现平稳过渡。

还有一种方法是专门部署一个“搬运工”服务,它作为消费者从旧Topic中拉取积压数据,然后作为生产者,将这些数据转发到新Topic中。这个“搬运工”服务自身也需要保证高可用和高性能,然后我们只需要一个消费者来消费新topic中的数据。

上面两种方法都是在更改mq的各种高配置,那么我们是否可以增加消费者的消费能力呢

增加消费者的消费能力

首先我们要做的是看是否存在慢sql等数据库服务来拖垮我们的消费水平,如果有可以使用分库分表或者创建索引的方法解决问题

引入降级策略

在某些业务场景下,消费逻辑并非“全有或全无”。我们可以借鉴微服务治理中的“降级”思想,在消息积压时,有策略地放弃一些非核心操作,以换取整体处理速度的提升。例如,一个用户动态(Feed)更新的消费者,其主要逻辑是:调用用户服务、调用内容服务、计算权重分、写入缓存。在消息积压时,我们可以引入降级策略:在处理消息前,先检查该动态的Feed缓存是否存在。如果缓存已存在,则跳过后续所有复杂的计算和调用,直接认为处理成功。这个逻辑的依据是:既然数据有10分钟的缓存,那么在积压的紧急情况下,用户暂时看到几分钟前的旧数据是可以接受的。

将微服务治理的思想灵活运用到消息消费场景,能向面试官展现你知识体系的广度和解决问题的灵活性。

批处理

观察生产者的行为,有时也能发现优化的契机。设想一个批量更新商品库存的场景。上游系统每当一个商品库存变更,就发送一条消息。当有成千上万的商品需要同时更新时,就会产生海量的单条消息。

我们可以对生产者进行改造,让它将短时间内的多个库存变更聚合成一条消息再发送。相应地,消费者也改造为支持批量处理。一次数据库操作处理上百个商品的库存更新,其效率远高于执行上百次单独的更新操作。

在面试时,一个更能体现你主动性的说法是:“即使生产者无法改造,我们也可以只优化消费者。让消费者一次拉取一批消息(如100条),然后在内存中将这些消息构造成一个批量请求,再一次性提交给下游服务或数据库。这种‘消费侧聚合’同样能取得不错的效果,更能体现我们作为消费端负责人的担当和优化能力。”

异步消费 + 批量提交

其实消费挤压慢点问题主要在于我们需要在从消息队列中读取消息,然后消费,然后提交,这个过程可能比较漫长,那我们是否可以采用异步消费的方法呢,就是我们使用一个中间层,他就负责从消息队列中读取消息,然后通过协程池或者其他技术将消息的处理过程分发出去,这样是不是就可以大大的增加消费的速度

image-20250831222516412

这样一来,拉取消息的I/O操作和处理消息的CPU/I/O密集型操作就完全分离开来,互不干扰,整体吞吐能力大大增强。

在介绍这个终极方案时,面试官一定会追问其复杂性。你需要主动、深入地探讨该模型带来的三大挑战及其解决方案,这才是体现你架构设计能力的关键。

挑战一、二:消息丢失、重复消费问题

这里我们只是转发消息,那什么时候提交呢,如果我们把所有的消息都分发出去之后就提交会有什么问题:如果消息提交了但是大部分消息的消费都失败了,会出现消息丢失问题,如果消息都消费完了系统宕机了会出现消息重复消费问题,我们首先要解决这两个问题。

其实对于消息重复消费问题相对比较好解决,我们可以使用保证消费逻辑的幂等性(Idempotence)。这是解决重复消费问题的唯一正确途径。无论同一条消息被处理多少次,其最终产生的结果都应该是一致的。实现幂等性的常见方法包括:使用数据库唯一键约束、乐观锁(版本号机制)、或是在处理前查询状态等。

那么消息的丢失问题如何解决呢,应对这种情况我们就可以考虑批量提交。消费者线程一次性拉取一批消息(比如100条),分发给工作线程池。然后,它会等待这100条消息全部被工作线程处理完毕后,才一次性向MQ提交这批消息的最高位点。

挑战三:部分失败问题

我们这样的批次消费,如果99%的消息都消费成功了只有1%的消息没有被消费,那么我们应该怎么操作

同步重试

对于消息消费失败的消费者可以设置重试次数和超时时间,让其重新消费,但是这有一个问题就是我们消息提交是要等待所有的消费者都消费完成才会提交,如果使用重试的策略就会导致其他消费完成的消费者等待这个重试的消费者的场景。

异步重试

既然同步重试不行,那异步重试是否可以,也有问题,如果异步重试也失败了呢,还有他会增加代码的复杂性

重新写入消息队列

还有一种方法就是将消费失败的消息重新写入到消息队列,但要保证的是错误的在少数,而且这个错误是可以通过重试解决的,需要特别注意的是,必须在消息体中加入一个重试计数字段,当重试次数达到阈值(如3次)后,就不再重新投递,而是将消息记录到死信队列(Dead Letter Queue)或日志中,进行人工干预,以防止“毒丸消息”导致无限循环。