1. 幂等性用户对于同一操作发起的一次请求或者多次请求的结果是一致的不会因为多次点击而产生了副作用。 举个最简单的例子那就是支付用户购买商品后支付支付扣款成功但是返回结果的时候网络异常 此时钱已经扣了用户再次点击按钮此时会进行第二次扣款返回结果成功用户查询余额发现多扣钱 了流水记录也变成了两条。在以前的单应用系统中我们只需要把数据操作放入事务中即可发生错误立即回滚但是再响应客户端的时候也有可能出现网络中断或者异常等等。消息幂等性其实就是保证同一个消息不被消费者重复消费两次1.1 消息重复消费重复投递重复投递生产在往MQ发送消息时MQ收到消息并持久化到本地后进行发布确认告诉生产者消息已经被持久化的过程中出现网络中断生产者没有收到消息发布确认的消息故而重新发送一条消息重复消费消费者在消费 MQ 中的消息时MQ 已把消息发送给消费者消费者在给 MQ 返回 ack 时网络中断 故 MQ 未收到确认信息该条消息会重新发给其他的消费者或者在网络重连后再次发送给该消费者但实际上该消费者已成功消费了该条消息造成消费者消费了重复的消息。1.2 解决思路MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳或者 UUID 订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断或者可按自己的规则生成一个全局唯一 id每次消费消息时用该 id 先判断该消息是否已消费过。1.3 消费端的幂等性保障在海量订单生成的业务高峰期生产端有可能就会重复发生了消息这时候消费端就要实现幂等性 这就意味着我们的消息永远不会被消费多次即使我们收到了一样的消息。业界主流的幂等性有两种操作:方式1: 消息全局 ID 或者写个唯一标识(如时间戳、UUID 等) 每次消费消息之前根据消息 id 去判断该消息是否已消费过如果已经消费过则不处理这条消息否则正常消费消息并且进行入库操作。(消息全局 ID 作为数据库表的主键防止重复)。这里可以结合业务根据业务的唯一ID消息的业务需求拼接成唯一ID。在插入的时候通过主键校验来避免重复投递在消费的时候通过状态判断来避免重复消费方式2: 利用 Redis 的 setnx 命令给消息分配一个全局 ID消费该消息时先去 Redis 中查询有没消费记录无则以键值对形式写入 Redis 有则不消费该消息。1.4 唯一 ID 代码演示1.4.1 配置spring.rabbitmq.host192.168.0.68spring.rabbitmq.port5672spring.rabbitmq.usernameadminspring.rabbitmq.passwordadmin spring.rabbitmq.virtual-host/# 开启消息发布确认机制spring.rabbitmq.publisher-confirm-typecorrelated# 发布消息返回监听回调spring.rabbitmq.publisher-returnstrue# 指定消息确认模式spring.rabbitmq.listener.simple.acknowledge-modemanual# 未正确路由的消息发送到备份队列# 使用备份交换机模式mandatory 将无效即就算 mandatory设 置为 false路由失败的消息同样会被投递到绑定的备份交换机spring.rabbitmq.template.mandatorytrue1.4.2 队列和交换机配置ConfigurationpublicclassRevisitConfig{/** * 创建 direct 队列 * */BeanQueueDirectQueue01(){returnnewQueue(DirectQueue-01,true);}/** * 创建 direct 交换机 * */BeanDirectExchangeDirectExchange01(){returnnewDirectExchange(DirectExchange-01);}/** * 绑定 direct 队列和交换机 * */BeanBindingbindingDirect01(){returnBindingBuilder.bind(DirectQueue01()).to(DirectExchange01()).with(DirectRouting01);}}1.4.3 自定义消息应答回调方法ComponentSlf4jpublicclassMyCallbackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{AutowiredprivateRabbitTemplaterabbitTemplate;//依赖注入 rabbitTemplate 之后再设置它的回调对象// 此注解会在其他注解执行完成后再执行所以rabbitTemplate先注入再执行此初始化方法PostConstructpublicvoidinit(){// 设置rabbitTemplate的ConfirmCallBack为我们重写后的类rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/** * 交换机不管是否收到消息都会执行的一个回调方法 * * param correlationData 消息相关数据 * param ack 交换机是否收到消息 * param cause 未收到消息的原因 */Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause){StringidcorrelationData!null?correlationData.getId():;if(ack){log.info(交换机已经收到 id 为:{}的消息,id);}else{log.info(交换机还未收到 id 为:{}消息原因:{},id,cause);}}// 确认消息是否从交换机成功到达队列中失败将会执行成功则不执行OverridepublicvoidreturnedMessage(Messagemessage,intreplayCode,StringreplayText,Stringexchange,StringroutingKey){log.info(消息{}被交换机{}退回退回原因{}路由key,newString(message.getBody()),exchange,replayText,routingKey);}}1.4.4 数据库对象相关配置数据库脚本CREATETABLEmessage_idempotent(message_idvarchar(50)NOTNULLCOMMENT消息ID,message_contentvarchar(2000)DEFAULTNULLCOMMENT消息内容,PRIMARYKEY(message_id))ENGINEInnoDBDEFAULTCHARSETutf8;对象DataNoArgsConstructorAllArgsConstructorpublicclassMessageIdempotentextendsModelMessageIdempotent{TableId(message_id)privateStringmessageId;TableField(message_content)privateStringmessageContent;}mapperMapperpublicinterfaceMessageIdempotentMapperextendsBaseMapperMessageIdempotent{}1.4.5 生产者编写/** * 消息幂等性 * */GetMapping(/sendMessage)publicvoidsendMessage(Stringmsg,Stringid){MessagePropertiesmessagePropertiesnewMessageProperties();messageProperties.setMessageId(id);messageProperties.setContentType(text/plain);messageProperties.setContentEncoding(utf-8);MessagemessagenewMessage(msg.getBytes(),messageProperties);log.info(生产消息:message.toString());// 消息发送确认回调CorrelationDatacorrelationDatanewCorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(DirectExchange-01,DirectRouting01,message,correlationData);}访问接口http://localhost:8091/shiro/revisit/sendMessage?msg你好啊id1http://localhost:8091/shiro/revisit/sendMessage?msgid1日志此处有confirmCallback未回调问题待解决按道理打印完生产消息后应该打印交换机已经收到 id 为:{}的消息2023-04-1014:31:12.859INFO19232---[nio-8091-exec-1]c.y.t.r.TestRevisit.RevisitController:生产消息:(Body:你好啊MessageProperties[headers{},messageId1,contentTypetext/plain,contentEncodingutf-8,contentLength0,deliveryModePERSISTENT,priority0,deliveryTag0])2023-04-1014:31:29.002INFO19232---[nio-8091-exec-2]c.y.t.r.TestRevisit.RevisitController:生产消息:(Body:MessageProperties[headers{},messageId1,contentTypetext/plain,contentEncodingutf-8,contentLength0,deliveryModePERSISTENT,priority0,deliveryTag0])客户端中1.4.6 消费者编写RabbitListener(queuesDirectQueue-01)publicvoidreceiveMessage02(Messagemessage,Channelchannel)throwsIOException{StringmessageIdmessage.getMessageProperties().getMessageId();StringmessageContentnewString(message.getBody(),StandardCharsets.UTF_8);MessageIdempotentmessageIdempotentnewMessageIdempotent();messageIdempotent.setMessageId(messageId);messageIdempotent.setMessageContent(messageContent);try{if(messageIdempotentMapper.insert(messageIdempotent)0){log.info(DirectQueue-01-消费者收到消息消息IDmessageId 消息内容messageContent);// 消息确认channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}else{log.info(消息 messageId 已经消费过);}}catch(Exceptione){log.info(消息 messageId 已经消费过);}}结果2023-04-1014:47:06.738INFO25416---[ntContainer#6-1]c.y.t.r.TestRevisit.RevisitConsumer:DirectQueue-01-消费者收到消息消息ID1消息内容你好啊2023-04-1014:47:06.745INFO25416---[ntContainer#6-1]c.y.t.r.TestRevisit.RevisitConsumer:消息1已经消费过数据库中队列中1.5 note Redis 原子性利用 redis 执行 setnx 命令天然具有幂等性从而实现不重复消费。利用redis的操作的好处是缓存更快。代码这里不再演示无非是一个插入数据库一个setnx进redis。2. 消息丢失2.1 消息丢失的场景第一种生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候可能数据就在半路给搞丢了因为网络问题啥的都有可能。第二种RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了第三种消费端弄丢了数据。刚消费到还没处理结果进程挂了比如重启了。2.2 RabbitMQ消息丢失解决方案2.2.1 针对生产者1. 方案1 开启RabbitMQ事务可以选择用 RabbitMQ 提供的事务功能就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect然后发送消息如果消息没有成功被 RabbitMQ 接收到那么生产者会收到异常报错此时就可以回滚事务channel.txRollback然后重试发送消息如果收到了消息那么可以提交事务channel.txCommit。// 开启事务channel.txSelecttry{// 这里发送消息}catch(Exceptione){channel.txRollback// 这里再次重发这条消息}// 提交事务channel.txCommit缺点RabbitMQ 事务机制是同步的你提交一个事务之后会阻塞在那儿采用这种方式基本上吞吐量会下来因为太耗性能。2. 方案2 使用confirm机制事务机制和 confirm 机制最大的不同在于事务机制是同步的你提交一个事务之后会阻塞在那儿但是 confirm 机制是异步的在生产者开启了confirm模式之后每次写的消息都会分配一个唯一的id然后如果写入了rabbitmq之中rabbitmq会给你回传一个ack消息告诉你这个消息发送OK了如果rabbitmq没能处理这个消息会回调你一个nack接口告诉你这个消息失败了你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id如果超过一定时间还没接收到这个消息的回调那么你可以进行重发。即第一节MyCallback中/** * 交换机不管是否收到消息都会执行的一个回调方法 * * param correlationData 消息相关数据 * param ack 交换机是否收到消息 * param cause 未收到消息的原因 */Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause){StringidcorrelationData!null?correlationData.getId():;if(ack){log.info(交换机已经收到 id 为:{}的消息,id);}else{log.info(交换机还未收到 id 为:{}消息原因:{},id,cause);}}2.2.2 针对RabbitMQ说三点要保证rabbitMQ不丢失消息那么就需要开启rabbitMQ的持久化机制即把消息持久化到硬盘上这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息如果rabbitMQ单点故障怎么办这种情况倒不会造成消息丢失这里就要提到rabbitMQ的3种安装模式单机模式、普通集群模式、镜像集群模式这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式如果硬盘坏掉怎么保证消息不丢失1. 消息持久化RabbitMQ 的消息默认存放在内存上面如果不特别声明设置消息不会持久化保存到硬盘上面的如果节点重启或者意外crash掉消息就会丢失。所以就要对消息进行持久化处理。如何持久化下面具体说明下要想做到消息持久化必须满足以下三个条件缺一不可。Exchange 设置持久化Queue 设置持久化Message持久化发送发送消息设置发送模式deliveryMode2代表持久化消息2. 设置集群镜像模式我们先来介绍下RabbitMQ三种部署模式单节点模式最简单的情况非集群模式节点挂了消息就不能用了。业务可能瘫痪只能等待。普通模式消息只会存在与当前节点中并不会同步到其他节点当前节点宕机有影响的业务会瘫痪只能等待节点恢复重启可用必须持久化消息情况下。镜像模式消息会同步到其他节点上可以设置同步的节点个数但吞吐量会下降。属于RabbitMQ的HA方案为什么设置镜像模式集群因为队列的内容仅仅存在某一个节点上面不会存在所有节点上面所有节点仅仅存放消息结构和元数据。下面自己画了一张图介绍普通集群丢失消息情况如果想解决上面途中问题保证消息不丢失需要采用HA 镜像模式队列。下面介绍下三种HA策略模式同步至所有的同步最多N个机器只同步至符合指定名称的nodes命令处理HA策略模版rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]为每个以“rock.wechat”开头的队列设置所有节点的镜像并且设置为自动同步模式rabbitmqctl set_policy ha-all^rock.wechat{ha-mode:all,ha-sync-mode:automatic} rabbitmqctl set_policy-p rock ha-all^rock.wechat{ha-mode:all,ha-sync-mode:automatic}为每个以“rock.wechat.”开头的队列设置两个节点的镜像并且设置为自动同步模式rabbitmqctl set_policy-p rock ha-exacly^rock.wechat\ {ha-mode:exactly,ha-params:2,ha-sync-mode:automatic}为每个以“node.”开头的队列分配指定的节点做镜像rabbitmqctl set_policy ha-nodes^nodes\.\ {ha-mode:nodes,ha-params:[rabbitnodeA,rabbitnodeB]}但是HA 镜像队列有一个很大的缺点就是 系统的吞吐量会有所下降3. 消息补偿机制为什么还要消息补偿机制呢难道消息还会丢失没错系统是在一个复杂的环境不要想的太简单了虽然以上的三种方案基本可以保证消息的高可用不丢失的问题但是作为有追求的程序员来讲要绝对保证我的系统的稳定性有一种危机意识。比如持久化的消息保存到硬盘过程中当前队列节点挂了存储节点硬盘又坏了消息丢了怎么办生产端首先将业务数据以及消息数据入库需要在同一个事务中消息数据入库失败则整体回滚。字段包括消息id消息状态重试次数创建时间等根据消息表中消息状态失败则进行消息补偿措施重新发送消息处理。2.2.3 针对消费者1. 方案一ACK确认机制多个消费者同时收取消息比如消息接收到一半的时候一个消费者死掉了(逻辑复杂时间太长超时了或者消费被停机或者网络断开链接)如何保证消息不丢使用rabbitmq提供的ack机制服务端首先关闭rabbitmq的自动ack然后每次在确保处理完这个消息之后在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。才把消息从内存删除。这样就解决了即使一个消费者出了问题但不会同步消息给服务端会有其他的消费端去消费保证了消息不丢的case。2.3 总结如果需要保证消息在整条链路中不丢失那就需要生产端、mq自身与消费端共同去保障。生产端对生产的消息进行状态标记开启confirm机制依据mq的响应来更新消息状态使用定时任务重新投递超时的消息多次投递失败进行报警。mq自身开启持久化并在落盘后再进行ack。如果是镜像部署模式需要在同步到多个副本之后再进行ack。消费端开启手动ack模式在业务处理完成后再进行ack并且需要保证幂等。通过以上的处理理论上不存在消息丢失的情况但是系统的吞吐量以及性能有所下降。在实际开发中需要考虑消息丢失的影响程度来做出对可靠性以及性能之间的权衡。3. 消息积压所谓消息积压一般是由于消费端消费的速度远小于生产者发消息的速度导致大量消息在 RabbitMQ 的队列中无法消费。其实这玩意我也不知道为什么面试这么喜欢问…既然消费者速度跟不上生产者那么提高消费者的速度就行了呀个人认为有以下几种思路对生产者发消息接口进行适当限流不太推荐影响用户体验多部署几台消费者实例推荐适当增加 prefetch 的数量让消费端一次多接受一些消息推荐可以和第二种方案一起用4. 消息消费顺序性问题