1.消息队列积压了怎么办?
1.消息队列积压了怎么办?
大家好,我是牛哥。
今天想跟大家分享的主题是消息队列积压了怎么办。
消息队列积压不是单一环节的问题,而是 生产 - 传输 - 消费 全链路供需失衡的结果,既可能是突发流量冲垮了处理能力,也可能是设计疏漏留下了隐患。
我们今天不搞晦涩的理论,只讲能落地的办法,比如怎么通过批量处理提效率、怎么用资源隔离保核心、怎么靠死信队列避免阻塞。希望不管是刚接触消息队列的同学,还是遇到过积压难题的朋友,都能从里找到能用的思路,少走点弯路,让系统的异步链路跑得更稳。
什么是消息积压?
消息队列的核心逻辑是 生产者发送消息 → 队列暂存消息 → 消费者消费消息。
当生产者发送消息的速度持续大于消费者处理消息的速度,或消费者完全停止处理时,未被消费的消息会在队列中不断堆积,这种状态就称为 消息队列积压
可以类比日常生活中的快递驿站:
生产者 = 快递员,不断送快递到驿站;
队列 = 驿站仓库,暂存快递;
消费者 = 取件人,从仓库拿快递;
积压 = 快递员送件速度远快于取件人,仓库里的快递堆成山。
积压若不及时处理,会从 队列异常 扩散到整个业务系统,最核心的就是消息延迟加剧,新入队消息处理时间越来越长,最终一直卡在队列之中,导致过了业务实效性,甚至因为积压太久、积压消息太多,触发消息回收策略,导致消息丢失。
所以面对积压,我们核心目标是短时间内降低队列堆积量,优先保障核心业务不受影响。
优先救火:快速解决积压
1. 临时扩容消费者(最直接有效)
快速扩容消费者集群,利用消息队列的负载均衡机制(如Kafka分区再平衡、RabbitMQ队列竞争),让更多节点并行处理消息。
注意,若队列有分区/分片限制(如Kafka),也就是消费者数量不可超过分区数(否则多余节点空闲),可临时增加分区(部分队列支持动态扩分区,如RocketMQ)。
2. 降级非核心逻辑,优先处理关键消息
当消息出现积压时,降级非核心逻辑、优先处理关键消息是快速缓解压力的有效手段,核心思路是 聚焦核心、简化流程,具体可以从两方面着手:
一方面是筛选消息优先级。先明确哪些是必须立刻处理的核心消息,比如订单支付结果、交易状态变更等直接影响用户体验和业务正确性的消息;而像营销推送、数据统计这类非核心消息,可以暂时让路。处理时让消费者直接跳过非核心消息(比如收到后直接标记为已处理),把资源集中在核心消息上,避免它们被非核心消息挤占而延迟。
另一方面是简化核心消息的处理流程。平时处理消息可能包含完整的校验、日志记录、多系统同步等步骤,积压时可以临时砍掉非必要环节。
比如暂时跳过非关键数据的校验、简化日志输出、把同步调用改成异步回调等。这样能缩短单条消息的处理时间,让消费者能更快地处理更多核心消息,等积压缓解后再恢复完整流程。
通过这种 抓大放小 的方式,既能保证核心业务不受影响,又能快速降低队列压力,为后续彻底彻底解决问题争取时间。
3. 拆分队列减轻压力
考虑这种场景,因为某个业务做活动,大量消息在几分钟内打入消息队列,积压了几亿条数据,假设你没有钱扩容,按现有能力要处理掉这些消息,需要1小时,然后此时每秒还有其它业务的消息,我们就假设1000/s,此时这些消息一旦进入消息队列,相当于会被前面的积压堵住。
这时候如果业务允许,其实可以选择保新,即新消息导入到新的消息队列,这样新消息可以得到及时处理,而旧消息,就等1小时之后慢慢消耗掉即可。
如果老队列积压的太厉害了,也可以用脚本,批量拉取老队列消息并转发至临时队列,由专项消费者集中处理,避免老队列持续阻塞。
根源排查:定位积压原因
等紧急缓解了积压压力,可不能就此结束,必须找到问题的根本原因,不然过不了多久可能还会再次出现积压。就像治病要找到病根一样,只有把源头问题解决了,才能避免重复救火,下面就说说常见的几个排查方向。
消费者问题
先看消费者这边,很多时候积压都是因为消费者消费速度不足。一方面可能是处理单条消息的速度太慢,比如可以通过监控看看单条消息要处理多久,如果超过 200 毫秒,那大概率有问题。
一种可能是处理逻辑太冗余,比如里面有好几轮同步的数据库查询,或者调用了很多没必要的外部接口,把时间都耗在了等待上;要么就是消费者的资源不够用了,CPU、内存或者 IO 被占满,机器跑不动自然处理得慢。
另一方面可能是消费者直接罢工了,比如服务突然宕机、网络断了连不上队列,或者消费者的线程池被耗尽,消息来了没人接收。
这种情况不用瞎猜,看看消费者的日志就能发现线索,比如有没有 ERROR 级别的异常信息,再结合监控告警,比如消费QPS一下变成零,这些都能帮我们快速定位消费中断的问题。
生产者问题
再说说生产者那边,有时候积压是因为消息突然变多了。比如遇到秒杀、促销这类活动,生产者发送消息的量会突然暴涨,原本平时每秒只发 1000 条,活动时一下子冲到每秒 10000 条,这时候消费者根本处理不过来,消息自然就堆起来了,只要看看生产者的 发送 QPS 曲线,就能清楚看到是不是流量突然激增导致的。
另外,还有可能是消息重复发送了,比如生产者的重试机制出了问题,没正确处理队列的 ACK 确认信号,导致同一条消息反复投递;
或者业务逻辑有 bug,比如某个循环没控制好,一直在发送同一条消息,这时候检查一下消息的唯一 ID,比如 msgId,看看有没有大量重复的就能确认。
队列本身配置
在排查消息队列积压问题时,还需重点审视队列自身的核心配置是否适配业务并发需求,这一点在 Kafka、RocketMQ 等主流分布式消息队列中尤为关键。
这类队列的分区或分片是实现并行消费的核心载体,其底层机制决定了单个分区同一时间只能被一个消费者实例占用消费,因此分区总数直接等同于消费端能达到的最大并行处理能力上限。
若分区数配置不合理,会直接造成消费资源浪费与处理能力瓶颈:例如某 Kafka 主题仅配置 3 个分区,即便为应对积压临时扩容至 10 台消费者机器,由于分区数量的限制,最终也只有 3 个消费实例能分配到分区并实际处理消息,剩余 7 台机器会处于空闲等待状态,无法参与消费任务,既浪费了服务器资源,也无法有效提升消息处理速度,难以缓解积压问题。
长期优化:建立防积压机制
想要从根源降低消息积压风险,我们可以通过技术优化与流程规范双管齐下,核心措施可分为以下四类:
优化消费者性能,提升处理效率
如果一条消息的处理逻辑里包含多个独立操作,没必要按顺序一个个等它们完成,完全可以让这些操作并行跑,这样能大幅缩短单条消息的处理时间。
比如之前处理一条订单消息,假设每个操作都是200ms,得先等订单数据写入数据库,这里是200ms,再等物流接口返回,又是200ms,最后等通知服务发完短信,从头到尾要600ms。
但其实这三个操作互相不依赖:写订单库不用等物流接口,发通知也不用等数据库写完,那就可以用多线程同时启动这三个任务:让一个线程去写数据库,另一个线程调用物流接口,第三个线程发通知,三个操作一起跑,最快 200ms 就能全完成,单条消息的处理效率直接几倍。
想提升消息消费的整体效率,批量处理也是个很实用的办法。可以先开启消费者的批量拉取功能,就像 Kafka 里的fetch.max.records参数,设置合适的值后,消费者一次能从队列里拉取多条消息,而不是一条一条地取,这样能减少和消息队列之间的网络交互次数。
拿到批量消息后,处理时也别一条一条往数据库里写,像用 MyBatis 的批量插入功能,把一批数据一次性提交给数据库,比起单条插入能大幅减少磁盘 IO 的次数。
网络和磁盘这两块最容易卡脖子的地方交互少了,整体的处理速度自然就提上来了,吞吐量也能跟着涨。不过批量大小得琢磨好,太大了可能占内存,太小了又体现不出批量的优势,得根据实际业务场景调个合适的数值。
除此之外,我们还可以做资源隔离,给核心消息和非核心消息分别配置独立的消费集群,避免非核心消息抢占核心业务的资源,影响关键流程。
比如支付和订单就是核心消息,日志和通知就是非核心消息。要是图省事把这两类消息堆在同一个消费集群里处理,风险就很大了:
万一某天非核心消息突然爆量,比如做活动时用户行为日志激增,或者系统升级后日志输出变多,这些消息就会像占路的大车一样,把消费集群的 CPU、内存、网络带宽全占满。
这时候核心消息再进来,就会发现没资源可用了,只能排队等着,原本几百毫秒就能处理完的订单消息,可能要等好几分钟,严重的甚至会导致订单状态同步延迟,用户付了钱却看不到订单,或者商家收不到支付通知,整个关键业务流程都会被拖慢。
所以更合理的做法是,给它们分别配置独立队列+独立消费集群。
核心消息专门用一套性能有保障、资源预留充足的集群,比如用配置更高的服务器,还提前做好弹性扩容策略,确保哪怕核心消息量突增,也能稳稳处理;
非核心消息则用另一套集群,资源配置可以根据其流量波动灵活调整,就算这类消息偶尔占满资源,也不会影响到核心集群的正常运转。这样一来,核心业务和非核心业务各走各的路,核心流程的稳定性就有了兜底,用户体验和业务可靠性也能得到保障。
优化消息设计,减少流程阻碍
消息瘦身:给消息减减肥
消息不是内容越多越好,反而简洁才高效。传递消息时,只保留业务必需的核心字段就行.
比如处理订单消息,没必要把用户的姓名、地址、联系方式等完整信息都塞进去,其实只传个用户 ID,消费端按需去数据库查详情就够了。
这样做能显著缩小单条消息的体积,尽量控制在 100KB 以内。别小看这一点,消息小了,网络传输时占用的带宽更少,队列存储时也更省空间,消费端解析起来也更快,从发送到处理的整个链路耗时都会缩短,间接减少了积压的可能性。
配置死信队列:给问题消息找个专属位置
消息处理难免会遇到临时故障,比如数据库突然连接不上、远程服务超时,这时消息会处理失败。如果让这些失败的消息一直留在原队列里反复重试,很容易阻塞后面的正常消息。
就像路上的故障车不挪走,会堵死整条路。死信队列就是专门解决这个问题的:配置好规则后,处理失败达到一定次数的消息会被自动转移到死信队列,原队列能继续处理新消息。之后可以通过定时任务批量重试死信消息,或者人工介入排查具体原因,既保证了正常流程不被打断,也不会漏掉需要处理的消息。
设置消息过期策略:让过期消息自动退场
不是所有消息都需要永久保存,有些消息过了特定时间就没价值了。比如普通的系统操作日志,超过 24 小时可能就没必要再处理了;非核心的营销通知,过了活动时间也失去了意义。给这类时效性低的消息设置 TTL(生存时间),比如 2 小时或 1 天,过期后队列会自动清理它们,不用再占用存储资源。这样一来,队列里始终是 “有价值” 的消息,存储压力小了,消息检索和消费的效率也会更高,从底层减少了积压的隐患。
做好流量控制,避免超出承载能力
如果用了上文我们的方法,消费能力上去了、也更可靠了,但还需要配合流量控制来优化。主要有两方面:
1.临时增加容量太仓库,需要提前规划,避免被打个措手不及
2.也不能无限扩容吧,终有穷尽和业务能负担的极限,不能说突然有个1000w/s的请求的瞬间高峰,立刻扩容几千台机器给支撑吧,那成本得多大
具体而言我们需要做如下部署:
提前容量规划:消息队列的处理能力不能走一步看一步,得提前历史数据提前布局。可以翻看下过去半年甚至一年的流量记录,找到业务高峰期的消息量。
比如日常每秒处理 1000 条消息,那规划时就得按 3 倍甚至更高的标准来准备,这样遇到突发流量才不会慌。具体来说,一方面要算好队列分区数,比如 Kafka 的分区得足够多,保证消费者能并行处理;另一方面给消费者服务器配足资源,CPU、内存这些不能抠门,确保就算消息量冲到峰值,消费者吃掉消息的速度也始终比生产者喂进来的快,从根上避免积压。
生产者限流:消息发送端得有刹车机制,不能一股脑把消息全灌进队列。可以用令牌桶算法来控制节奏,设定好每秒最多发多少条消息,超过这个量就先拦一下,等有令牌了再发,避免突发流量直接冲垮队列。遇到流量实在超标的情况,非核心业务就得懂让路:比如营销推送、用户行为分析这类消息,临时停掉也不影响主要交易,先把资源让给订单、支付这些核心消息,保证关键业务能正常跑。
峰值预演:大促、秒杀这种流量高峰就像考试,考前得好好复习。活动开始前一两周,最好用压测工具模拟几倍于日常的消息量,看看消费者能不能扛住,比如预计双 11 零点会有每秒 5000 条订单消息,就用压测工具发 8000条/s 试试水。如果发现处理延迟变长,就赶紧加服务器、扩线程,把资源提前配到位。这样等活动真的开始,消息处理就能从容应对,不会手忙脚乱地出现积压。
完善监控与告警,及时发现问题
前面我们讲了预防问题、解决问题,但是总有问题真出现的时候,这时候,最重要的就是能及时发现问题,在后端领域,就是通过监控来实现问题发现的及时性。
监控不能眉毛胡子一把抓,得聚焦核心监控指标,盯着最能反映队列健康状态的核心指标
- 队列层面:要重点看积压消息的数量,这就像水库的水位,一旦超过安全线就得警惕;还要盯消息增长速率,比如突然从每秒 1000 条涨到 5000 条,可能预示着潜在的积压风险;另外,消息过期比例也不能忽视,要是大量消息还没被消费就因超时被删除,可能意味着消费能力严重不足。
- 消费侧:得关注每秒处理多少条消息,这直接反映处理效率;单条消息的平均处理耗时也很关键,突然从 200ms 涨到 1 秒,可能是消费逻辑出了问题;线程池利用率、消费失败率也要实时跟踪,前者过高说明资源快耗尽,后者飙升可能意味着业务逻辑有 bug。
- 生产侧:要盯着发送 QPS 是否异常波动,发送失败率是否突然升高,如果失败率升高,可能是队列满了或网络问题,还要重点关注消息重复率,重复消息太多会浪费消费资源,也可能引发业务问题。
理清楚监控指标之后,我们就需要设置告警策略,可以给关键指标设置不同级别的阈值,比如积压消息超过 10 万条触发警告,超过 50 万条升级为紧急;消息增长速率超过 5000 条 / 秒时自动告警。
告警渠道也得多样化,轻微问题可以发钉钉或者企业微信群通知,让团队成员留意;严重问题就得直接发短信甚至电话提醒,确保责任人员能第一时间看到。目标是让任何异常都能在 5 分钟内被发现,毕竟消息积压就像滚雪球,发现得越早,处理起来越轻松,造成的影响也越小。
总结
针对消息积压这个问题,从防微杜渐、到应急救火,我们聊了不少落地思路,其实这些方法的核心逻辑很简单:通过监控+告警及时发现异常;通过提前规划+动态调整让消费能力始终跑赢消息增量;最后是用隔离+容错减少意外故障的影响范围。
之所以花时间分享这个内容,是因为平时和不少同行交流时的共鸣,很多团队在做系统异步化时,都曾栽在消息积压这个坑里:可能是大促峰值一来,队列消息堆成山,核心订单处理延迟;也可能是一条失败消息卡着队列,导致正常业务跟着受影响;更有甚者,因为没提前做好监控和规划,等发现积压时已经波及用户体验。
希望这些从实战里总结的经验,能帮大家避开消息积压的坑,让消息队列真正成为系统解耦、提效的助力,而不是业务故障的导火索,也希望这篇文章对大家工作和面试能起到帮助。
