消息队列发送消息场景分析

张开发
2026/4/4 14:09:44 15 分钟阅读
消息队列发送消息场景分析
接下来我们将继续回归到应用层面去进一步探索怎么更好的使用消息队列这里主要考虑几个问题:怎么保证消息不丢失?怎么保证消息不重复消费?怎么让消息有序消费?消息如果不幸被积压了该何去何从?1.消息队列消费语义消息队列一般实现得都是at least once语意也就是至少能消费一次比如Kafka就是如此也就是只要消息进入了Kafka那么你是很容易去实现at least once的能力的但是要考虑被重复发送的可能性。但实际上在后端业务中很多时候业务对消费的态度其实是希望能exactly once也就是精确一次语义也就是既保证消息不丢失又保证消息不重复。Kafka要进一步实现“精准一次消费”是非常困难的。这主要是因为后端任何一个环节都要考虑各种故障情况:系统故障和网络问题: 分布式系统中Broker、生产者和消费者之间的通信可能会受到网络延迟、中断或系统故障等等异常情况的影响。这些问题可能导致消息重复发送、接收或处理。并发处理:Kafka支持高并发处理多个消费者可以同时消费同一个主题的消息。在并发环境中确保每条消息只被处理一次需要复杂的协调和同步机制。(在Kafka中同一个消费者组内的消费者之间是可以做到每条消息只消费一次的,所以上述的情况应该是不考虑消费组全部都是单独的消费者的时候)消费者处理逻辑:消费者在处理消息时可能会出现异常或错误导致消息处理失败。如果消费者没有正确处理这些情况(例如没有提交偏移量)那么消息可能会被重复处理。因此虽然Kafka可以通过一系列机制保证了消息的至少一次消费但由于分布式系统的复杂性和不确定性它无法承诺每条消息都被精确处理一次那怎么办呢?显然要实现精准一次消费实际光靠消息队列是做不到的需要消息队列与业务逻辑配合完成实际做法就是消息队列至少一次消费业务提供幂等性这种这种消息队列与业务配合的做法就能实现业务逻辑上的准确一次消费具体内容后面我们会展开叙述。2.如何保证消息不丢失?at least once即最少一次语义消息一定不丢但是有可能重复。怎么实现 at least once呢?我们可以反过来想什么情况下消息会丢失呢?实际上消息丢失有如下几个环节:生产环节存储环节消费环节生产环节消息生产环节也就是生产端把消息发送到Broker这个环节先说结论这个环节和Kafka这个应用的关系不大毕竟发消息是在Kafka客户端所以Kafka很难保证完全可靠唯一能做的是发送消息之后必须得到响应不然就反复重试。但是如果没重试生产服务就挂掉了那么还会丢消息。要解决这个问题只能从消息源入手比如你的数据是从数据库拿过来发那么发完就得记录下来没有记录的数据还会再次发送。就一个结论Kafka本身是没有什么机制能保证生产一定成功的需要有额外的确认机制才可以原因是这个环节是在数据进入消息队列之前甚至可能因为某些原因都没发送过数据所以不该由消息队列来保证所以这个结论也不影响消息队列的可靠性。存储环节存储数据不丢失也就是进去消息队列之后的数据是持久的Kafka这种消息队列是非常可靠的只要写入了队列就不会丢消息这个依托于持久化存储:Kafka的消息确认机制还依赖于其持久化存储能力。一旦消息被写入并确认(根据所选的确认级别)它就会被持久化存储到磁盘中。这意味着即使系统发生故障或重启已经确认的消息也不会丢失。它就可以被消费者至少消费一次。当然这里还有个生产者侧的写入配置策略这里稍微扩展一下,其中下图实线箭头表示需要操作完成虚线箭头表示不用等待操作完成:这里再扩展下Kafka实际上还通过副本机制让持久化数据更可靠即将每个主题划分为多个分区并为每个分区创建多个副本。这确保了即使部分Broker发生故障消息仍然可以从其他副本中恢复并被消费者消费。注意: 创建的多个副本是放在多个broker中的:每个主题在Kafka中被划分为多个分区每个分区可以有多个副本(一般设置为3个即可)。其中一个是Leader副本用于接收读写请求而其他的是Follower副本用于备份数据。当生产者发送消息到某个分区时消息首先被写入Leader副本并等待确认。根据所选的确认级别生产者可能会等待Leader确认或所有ISR确认。消费环节Kafka提供了偏移量管理功能Kafka消费者通过提交每个分区的偏移量来跟踪已经消费的消息即使消费者在处理消息是发生故障重新启动后他仍然可以从最后一个提交的偏移量处继续消费确保消息至少被处理一次。相比于自动提交另一种方式是手动提交也就是由消费者业务代码自己控制提交时机主动调用函数来提交。3.如何让消息不重复?我们先来看看什么场景下消息会重复消费:生产阶段: 发送时就重复了比如网络波动生产者就重复发生。存储阶段: 在Kafka中存储的阶段是不会重复的消费阶段: 业务收到消息之后因为各种原因不能及时提交偏移后面又拉到了相同消息幂等性生产Kafka的幂等性生产至在相同的Producer中同一条消息即使被多次发送到Kafka也只会被写入一次。Kafka 为了实现幂等性引入了两个关键的内部标识PID (Producer ID)每个生产者在初始化时Broker 会分配一个唯一的 PID。这相当于生产者的“身份证号”。Sequence Number (序列号)生产者发往每个分区Partition的消息都会附带一个从 0 开始递增的序号。Broker 端的判断逻辑Broker 会在内存中记录每个PID Partition对应的“当前最大序列号”。如果新消息的Broker 接受并写入。如果Broker 认为这是重复发送直接丢弃但返回成功回执以免生产者继续重试。如果说明中间丢消息了Broker 会报错。那么我们该如何启动幂等性生产呢?只需要在生产端enable.idempotence true// 启用幂等性生产 props.put(enable.idempotence, true); KafkaProducerString, String producer new KafkaProducer(props);幂等生产的局限性1.只能同一台机器才有用比如一个生产者因为网络波动三次重复发送这种情况就有效果。但是如果重复消息是由不同生产者者发出那就无法达到去重的效果比如我们多台机器从数据库里拉定时任务来发2台机器拉到了相同的任务都发给Kafka此时因为他们属于不同生产者Producer ID是不一样的所以无法达到去重的效果。2.同一台生产者机器如果发生了重启也不行就算是同一台生产者机器如果发生了重启其实在Kafka眼里也是不同生产者了Producer ID是不一样的所以无法达到去重的效果。3.同一台生产者机器消息丢失也不行由于开启了幂等生产在同一分区中同一生产者消息的编号需要是连续的也就意味着中间不能丢消息所以通常情况下开启幂等性也会启用 acksall和retries 设置来尽可能确保消息是可以连续的不然如果中间出现空洞整个流程就会卡住需要有专门的程序或者人工介入。综上所述幂等性生产是比较有局限性的他只能减少重复的概率而无法杜绝重复同时还会带来更严格的连续性限制是否开启要看具体场景。实际上一般后端开发中最后依赖的都是幂等性消费。即如果你真的希望消息不重复无论你是否开启幂等性生产最后都是靠幂等消费来兜底下面的内容我们会展开介绍幂等性消费。幂等性消费幂等性是后端设计一个重要原则因为网络是不可靠的前置操作(比如请求到MySQLMySQL之前的流程就是前置操作也不知道有哪些环节会重复)也是不可靠的也就是说一个相同的请求总有概率会重放到你的服务这种情况是需要考虑的。一般而言我们从接口层面保证幂等性一个接口可重入的基础就是它底层操作都是可重入的。但是这里要着重理解下幂等性消费和Kafka其实没有太大关系实际是业务通用手段下面给大家介绍一下Redis、MySQL中的幂等处理方法这两个组件也是业务消费阶段最长打交道的。如果存储是Redis如何进行幂等处理Redis常见实现幂等性的思路有如下三种唯一标识符:为每个消息分配一个全局唯一的标识符(如UUID)Redis Set:将已消费的消息ID存储在Redis的Set数据结构中。每次消费消息前检查该消息ID是否已存在于Set中。原子操作:使用Redis的原子操作(如SISMEMBER和SADD)来检查和添加消息ID确保操作的原子性。如果存储用MySQL如何进行幂等处理MySQL常见实现幂等性的思路有如下三种:唯一约束:在MySQL表中为消息ID创建一个唯一约束。INSERT IGNORE:使用 INSERT IGNORE 或 ON DUPLICATE KEY UPDATE语句来尝试插入消息记录。如果消息ID已存在则忽略或更新该记录。事务:在需要的情况下使用事务来确保多个操作的原子性。流量优化如果业务是用MySQL做存储那就可以参考上面MySQL幂等处理的思路另外这里其实有个小的优化点如果重复请求过多那么MySQL凭白无故会多承担这部分压力而MySQL这个数据库的性能是比较宝贵的所以可以加一层Redis过滤来优化:简单来说在Redis中缓存已经处理过的唯一Key压力到MySQL之前先做一次检查如果存在于Redis那么就不用到MySQL了如果不存在请求就继续打到MySQL依赖MySQL的幂等做最后兜底。4. 如何让消息有序可能有同学会问什么场景要消息有序只要有前后依赖的消息就需要有序比如我们最简单的一个资金系统假设一个用户的资金本来是1000他先充值1000再转账2000。如果按正确的顺序充值1000账面变成2000再转账2000是不是两个操作都是成功的?假设执行顺序反了就变成先转账2000此时账面上才1000显然是会失败的顺序对结果产生了影响这就是为什么需要消息有序。Kafka一个主题的全局可以看作无序的因为很多不同Partition用于存储但是在同一个Partition的消息显然是有序的。所以我们希望在业务上的消息有序就需要在Partition的路由上动手脚。简单粗暴一种最简单的做法就是根据业务确定分区即每类业务自己一个分区这样就可以实现业务消息有序。具体而言即将业务所有消息都指定同一个分区Key这样一来所有消息都会添加至同一个Partition这样就达到了我们的目的比如秒杀业务像下面这样向tp-seckill这个Topic发送消息都用分区Key:“aaabbbccc”来发送。kafkaTemplate.send(tp-seckill,aaabbbccc,msg);业务内分区我们前面说了可以根据业务分区而如果单个业务压力过大我们就要考虑业务内再次切分。这里可以类比一下MySQL的分表其实会发现差不多本质都是将集合进一步做切片以寻求更大的并发能力和吞吐量。下面我们就讲一下消息队列怎么以分表的思路做分区。分区思路---子业务分区按业务分了之后我们其实还可以深挖是否可以按子业务拆分比如金融服务分为风控子业务一个分区支付子业务一个分区消息服务分为短信通知分区微信通知分区。总结来说即探索将大颗粒拆得更小更细以便进一步扩展这就是子业务分区。// 短信信用分区Key: msg-aaabbbccc kafkaTemplate.send(tp-msg, msg-aaabbbccc, msg); // 微信信用分区Key: wechat-aaabbbccc kafkaTemplate.send(tp-msg, wechat-aaabbbccc, msg);分区思路---客户分区如果对于客户而言数据是比较独立的客户之间没什么交互我们还可以通过客户来进行分区。符合这种场景的业务就可以按客户来进行分区比如最简单的按用户id%100来分这样就可以划出来100个分区比如用户1001就用分区Key:user-mod-1,用户2002就用分区Key:user-mod-2。// 如果userid % 100 1就用分区Keyuser-mod-1比如用户1001 kafkaTemplate.send(tp-body, user-mod-1, msg); // 如果userid % 100 2就用分区Keyuser-mod-2比如用户2002 kafkaTemplate.send(tp-body, user-mod-2, msg);这种方式有一个缺点增减节点会对原有的路由分布会造成冲击比如原来9个分区用户id是10那算出来就是10%91号分区假设增加一个分区该用户就路由到10%100号分区也就是说当节点增加或减少原来的路由基本上是全乱了。如果要解决这个问题可以考虑参考Redis哈希槽的做法:引入槽的概念比如约定16384个槽用用户id进行Hash计算然后%16384这样就可以关联到算法关联到某个槽。分区思路---大小客户分区如果单纯按客户来分能解决大多数的问题但是我们设想这么一种场景某个业务有100个客户90个客户每天产生100条消息剩下10个客户每天产生10000000条消息也就是发生了大客户扎堆现象这样压力还是集中在同一个Partition。这种情况就可以考虑给大客户提供单独的分区比如我们知道ID为user-123的用户是一个大客户就单独为它指定一个分区user-big不和其它客户的user-common一起卷相当于是VIP通道了。5.消息积压怎么办异常消息导致的阻塞如果是顺序消费场景是依赖每条消息处理成功的如果某条消息是有问题的那就会一直堵在这里就像被放毒了一样。至于具体是什么问题这就千奇百怪了可能是交易依赖某个资源不足可能是代码存在什么特殊的bug刚好这笔交易的某个数值触发了这个bug需要消费服务升级才能解决。这种异常是直接卡死了整个消息队列影响比较严重此时就需要尽快介入升级消费端代码将这笔消息合理地处理掉。非核心模块拖后腿如果消费链路上还依赖了不那么核心的业务因为这些业务拉慢了消费速度在积压情况下就可以先进行系统降级也就是砍掉相关逻辑加快数据消费。举个例子我们是一个秒杀消费场景秒杀消费链路中有个数据统计上报这个功能并不是核心业务在需要更高性能时候可以给这个功能停掉这就要求代码里是具备了对应的开关功能的可以快速降级。怎么实现降级?有一个动态配置switch运行到这里的时候会先去读一下这个配置if里面就可以通过配置决定走不走下面的流程压力超过可承载范围单纯压力太大扛不住了此时可以做几件事情:快速加资源扩容加钱解决问题一般而言都是消费能力跟不上也就是需要增加消费者对应的资源这个前提是要你的消费者服务是可以水平扩容的不然增加机器也没用。设置中间消费者放置在真正的消费者和kafka之间中间消费者不处理消费逻辑仅仅只是提交offset同时保证将消息保存到别的地方比如内存磁盘数据库Redis其它kafka。这样可以一定程度缓解kafka存储的积压注意只是缓解争取到了一些解决问题的时间还是得想办法提高消费速度(实际不常使用)保新。考虑这种场景因为某个业务做活动大量消息在几分钟内打入消息队列积压了几亿条数据假设你没有钱扩容按现有能力要处理掉这些消息需要1小时然后此时每秒还有其它业务的消息我们就假设1000/s此时这些消息一旦进入消息队列相当于会被前面的积压堵住。这时候如果业务允许其实可以选择保新即新消息导入到新的消息队列这样新消息可以得到及时处理而旧消息就等1小时之后慢慢消耗掉即可。保新的缺点是kafka的日志清除日志压缩策略旧的未处理消息可能会丢失不开启清除压缩的话磁盘可能吃不消。总之要看处理能力数据量数据重要程度数据处理的紧急程度来取舍。

更多文章