前言
在实际的生产环境中有可能出现一条消息因为一些原因丢失,导致消息没有消费成功,从而造成数据不一致等问题,造成严重的影响。
消息丢失的场景主要分为:消息在生产者丢失,消息在RabbitMQ丢失,消息在消费者丢失。
一,消息在生产者丢失
场景介绍
消息生产者发送消息成功,但是 MQ 没有收到该消息,消息在从生产者传输到 MQ 的过程中丢失,一般是由于网络不稳定的原因。
解决方案
方案一:RabbitMQ 提供的事务功能
就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect
,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback
,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit
。
缺点:开启 rabbitmq 事务(同步),就会变为同步阻塞操作
,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。
方案二:开启commit消息确认模式
在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会回传一个 ack
消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调一个 nack
接口,告诉你这个消息接收失败,可以进行重试。
而且可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以重发。
两种方式区别
事务机制是同步
的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步
的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。
spring 整合 RabbitMQ 后只使用了异步监听确认模式。
二,消息在RabbitMQ丢失
场景介绍
消息成功发送到MQ,消息还没被消费却在MQ中丢失,比如MQ服务器宕机
或者重启
会出现这种情况;
解决方案
持久化交换机
,队列,消息,确保MQ服务器重启时依然能从磁盘恢复对应的交换机,队列和消息。
spring整合后默认开启了交换机,队列,消息的持久化
,所以不修改任何设置就可以保证消息不在RabbitMQ丢失。但是为了以防万一,还是可以申明下。
设置持久化有两个步骤:
①创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里面的数据。
②发送消息的时候讲消息的 deliveryMode 设置为2,这样消息就会被设为持久化方式,此时rabbitmq就会将消息持久化到磁盘上。
必须要同时开启这两个才可以。
而且持久化可以跟生产的 confirm 机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者 ack,这样就算是在持久化之前 rabbitmq 挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
三,消息在消费者丢失
场景介绍
消息费者消费消息时,如果设置为自动回复 MQ,消息者端收到消息后会自动回复 MQ 服务器,MQ 则会删除该条消息,如果消息已经在 MQ 被删除但是消费者的业务处理出现异常或者消费者服务宕机,那么就会导致该消息没有处理成功从而导致该条消息丢失。
解决方案
使用 rabbitmq 提供的 ack机制,首先关闭 rabbitmq 的自动 ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。
这样就可以避免消息还没有处理完就ack。
设置为手动回复MQ服务器,当消费者出现异常或者服务宕机时,MQ服务器不会删除该消息,而是会把消息重发给绑定该队列的消费者,如果该队列只绑定了一个消费者,那么该消息会一直保存在MQ服务器,直到消息者能正常消费为止。
本解决方案以一个队列绑定多个消费者为例来说明,一般在生产环境上也会让一个队列绑定多个消费者也就是工作队列模式来减轻压力,提高消息处理效率
MQ重发消息场景
①消费者未响应ACK,主动关闭频道或者连接;
②消费者未响应ACK,消费者服务挂掉。
评论区