侧边栏壁纸
博主头像
再见理想博主等级

只争朝夕,不负韶华

  • 累计撰写 112 篇文章
  • 累计创建 64 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

RabbitMQ消息堆积

再见理想
2022-05-27 / 0 评论 / 0 点赞 / 1,091 阅读 / 991 字

一,前言

当消息生产的速度长时间,远远大于消费的速度时。就会造成消息堆积。

二,影响

①可能导致新消息无法进入队列;

②消息等待消费的时间过长,超出了业务容忍范围。

三,产生堆积的情况

①生产者突然大量发布消息;

②消费者挂掉或消费失败;

③消费者出现性能瓶颈;

四,解决办法

①排查消费者的消费性能瓶颈;

②部署增加多个消费者;

③增加消费者的多线程处理

④可以合理配置concurrencyprefetch参数 一定程度解决消息堆积问题(对消费无顺序要求的消息)。

五,消息已经堆积如何解决

消息队列堆积,想办法把消息转移到一个新的队列,增加服务器慢慢来消费这个消息。

concurrency 和 prefetch 参数

默认情况下,rabbitmq消费者为单线程串行消费,这也是队列的特性。源码在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer 中可见:

当消费端处理缓慢造成的消息堆积就可以通过设置并发消费提高消费的速率,从而减少消息堆积的问题。
设置并发消费两个关键属性concurrentConsumers 和 prefetchCount:

concurrentConsumers

设置的是对每个listener在初始化的时候设置的并发消费者的个数

prefetchCount

是每次一次性从broker里面取的待消费的消息的个数。prefetchCount 是 BlockingQueueConsumer内部维护的一个阻塞队列 LinkedBlockingQueue 的大小,其作用就是如果某个消费者队列阻塞,就无法接收新的消息,该消息会发送到其它未阻塞的消费者。

文档解释

prefetchCount(prefetch)
The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode
is NONE. This will be increased, if necessary, to match the txSize

concurrentConsumers(concurrency)
The number of concurrent consumers to initially start for each listener.

简单解释下就是 concurrency 设置的是对每个 listener 在初始化的时候设置的并发消费者的个数, prefetch 是每次从一次性从 broker 里面取的待消费的消息的个数。

上面的配置在监控后台看到的效果如下:

图中可以看出有两个消费者同时监听Queue,但是注意这里的消息只有被一个消费者消费掉就会自动 ack,另外一个消费者就不会再获取到此消息,Prefetch Count为配置设置的值3,意味着每个消费者每次会预取3个消息准备消费。

每个消费者对应的 listener 有个 Exclusive 参数,默认为 false, 如果设置为 true,concurrency 就必须设置为1,即只能单个消费者消费队列里的消息,适用于必须严格执行消息队列的消费顺序(先进先出)。

深入分析(看看源码):
container启动的时候会根据设置的 concurrency 的值(同时不超过最大值)创建n个BlockingQueueConsumer。

那么prefetch的值意味着什么呢?

BlockingQueueConsumer 内部应该维护了一个阻塞队列 BlockingQueue,prefetch 应该是这个阻塞队列的长度。看下BlockingQueueConsumer 内部有个 queue,这个 queue 不是对应 RabbitMQ 的队列,而是Consumer 自己维护的内存级别的队列,用来暂时存储从 RabbitMQ 中取出来的消息。

每个 BlockingQueueConsumer 消费者内部的队列大小就是 prefetch 的大小。

业务问题:可以合理配置 concurrency 和 prefetch 参数 一定程度解决消息堆积问题(对消费无顺序要求的消息)。

0

评论区