技术干货分享:RabbitMQ消息积压的几种解决思路

createh511小时前技术教程1

在日常工作中使用RabbitMQ偶尔会遇不可预料的情况导致的消息积压,一般出现消息积压基本上分为几种情况:

  1. 消费者消费消息的速度赶不上生产速度,这种问题主要是业务逻辑没设计好消费者和生产者之间的平衡,需要改业务流程或逻辑以保证消费度跟上生产消息的速,譬如增加消费者的数量等。
  2. 消费者出现异常,导致一直无法接收新的消息,这种问题需要排查消费的逻辑是不是有问题,需要优化程序。

除了上面的这两种问题,还有一些其他情况会导致消息积压,譬如一些系统是无法预计成产消息的速度和频率,又或者消费者的速度已经被限制,不能通过加新的消费者来解决,譬如不同的系统间的API对接,对接那一方就做了请求频率的限制,或者对方系统承受不了太大的并发,还有一些系统如果是面对企业客户,譬如电商,物流,仓储等类似平台系统的客户的下单是没有规律的或者集种某一个时间段下单的,这种就不能简单的通过加消费者来解决,就需要分析具体业务来避免消息积压。

针对这种情况,我想到了4中解决思路:

  1. 拆分MQ,生产者一个MQ,消费者一个MQ,写一个程序监听生产者的MQ模拟消费速度(譬如线程休眠),然后发送到消费者的MQ,如果消息积压则只需要处理生产者的MQ的积压消息,不影响消费者MQ
  2. 拆分MQ,生产者一个MQ,消费者一个MQ,写一个程序监听生产者的MQ,定义一个全局静态变量记录上一次消费的时间,如果上一次时间和当前时间只差小于消费者的处理时间,则发送到一个延迟队列(可以使用死信队列实现)发送到消费者的MQ,如果消息积压则只需要处理生产者的MQ的积压消息,不影响消费者MQ
  3. 使用Redis的List或ZSET做接收消息缓存,写一个程序按照消费者处理时间定时从Redis取消息发送到MQ
  4. 设置消息过期时间,过期后转入死信队列,写一个程序处理死信消息(重新如队列或者即使处理或记录到数据库延后处理)

其中使用延时队列会相对来说逻辑简单,业务逻辑变更也不大,在RabbitMQ中,可使用死信来及延时队列插件
rabbitmq_delayed_message_exchange两种方式实现延时队列。

使用插件可以在官网找到:
https://www.rabbitmq.com/community-plugins.html

插件的安装及使用方式就不做介绍了,主要介绍下使用死信来实现延时队列,原理就是将消息发送到一个死信队列,并设置过期时间,过期后将私信转发到要处理的消息队列。
生产者相关代码:

          /// 
        /// 发送延时队列消息
        /// 
        /// 
        /// 
        /// 默认20
        public void SendDelayQueues(string message, string queueName,double delayMilliseconds,string beDeadLetterPrefix="beDeadLetter_")
        {
            #region 死信到期后转入的交换机及队列
            //死信转入新的队列的路由键(消费者使用的路由键)
            var routingKey = queueName;
            var exchangeName = queueName;
            //定义队列
            Channel.QueueDeclare(queue: queueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);
            //定义交换机
            Channel.ExchangeDeclare(exchange: exchangeName,
                type: "direct");
            //队列绑定到交换机
            Channel.QueueBind(queue: queueName,
                exchange: exchangeName,
                routingKey: routingKey);
            #endregion

            //将变成死信的队列名
            var beDeadLetterQueueName = beDeadLetterPrefix + queueName;
            //将变成死信的交换机名
            var beDeadLetterExchangeName = beDeadLetterPrefix + queueName;

            //定义一个有延迟的交换机来做死信(该消息不能有消费者,不然无法变成死信)
            Channel.ExchangeDeclare(exchange:beDeadLetterExchangeName ,
                type: "direct");
            
            //定义该延迟消息过期变成死信后转入的交换机(消费者需要绑定的交换机)
            //Channel.ExchangeDeclare(exchange: queueName,type: "direct");

            var dic = new Dictionary();
            //dic.Add("x-expires", 30000);
            //dic.Add("x-message-ttl", 12000);//队列上消息过期时间,应小于队列过期时间  
            dic.Add("x-dead-letter-exchange", queueName);//变成死信后转向的交换机
            dic.Add("x-dead-letter-routing-key",routingKey);//变成死信后转向的路由键
            //定义将变成死信的队列
            Channel.QueueDeclare(queue: beDeadLetterQueueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: dic);

            //队列绑定到交换机
            Channel.QueueBind(queue: beDeadLetterQueueName,
                exchange: beDeadLetterExchangeName,
                routingKey: routingKey);

            //不要同时给一个消费者推送多于prefetchCount个消息, ushort prefetchCount = 20
            //Channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false);
            var body = Encoding.UTF8.GetBytes(message);
            var properties = Channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.DeliveryMode = 2;//持久化消息
            //过期时间
            properties.Expiration = delayMilliseconds.ToString();
            Channel.BasicPublish(exchange: beDeadLetterExchangeName,
                routingKey: routingKey,
                basicProperties: properties,
                body: body);
        }

消费者相关代码:

        /// 
        /// 设置延迟队列接收的事件
        /// 
        /// 
        /// 
        /// 默认1
        /// 
        /// 
        public void SetDelayQueuesReceivedAction(Action action, string queueName, ushort prefetchCount = 1,
            bool autoAck = false, int consumerCount = 1)
        {
            if (prefetchCount < 1)
            {
                throw new Exception("consumerCount must be greater than 1 !");
            }

            var exchangeName = queueName;
            var routingKey = queueName;
            for (int i = 0; i < consumercount i var channel='Connection.CreateModel();' channel.queuedeclarequeue: queuename durable: true exclusive: false autodelete: false arguments: null channel.exchangedeclareexchange: exchangename type: direct channel.queuebindqueue: queuename exchange: exchangename routingkey: routingkey prefetchcount channel.basicqosprefetchsize: 0 prefetchcount: prefetchcount global: false channellist.addchannel var consumer='new' eventingbasicconsumerchannel consumer.received>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    //Console.WriteLine("处理消费者ConsumerTag:" + ea.ConsumerTag);
                    action(message);
                    //手动确认消息应答
                    Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                //autoACK自动消息应答设置为false
                Channel.BasicConsume(queue: queueName, autoAck: autoAck, consumer: consumer);
            }
        }

如果感觉本文对你有帮助关注我一起学习进步

原文地址:
https://www.cnblogs.com/townsend/p/13663544.html

相关文章

一篇文章搞懂同步与异步、阻塞与非阻塞

要想掌握好Java NIO需要涉及了解同步与异步、阻塞与非阻塞,本文通过相关例子让你深入理解其本质@mikechen阻塞阻塞与非阻塞是对同一个线程来说的,在某个时刻,线程要么处于阻塞,要么处于非阻塞。...

数组阻塞队列ArrayBlockingQueue底层数据结构和使用场景

1. 数组存储元素:ArrayBlockingQueue 使用一个固定大小的数组来存储队列中的元素。在创建 ArrayBlockingQueue 实例时,需要指定数组的容量,例如:BlockingQu...

终于有人把Java程序员都要学的知识点整理了,令人茅塞顿开

JVM无论处于何种层级的 Java 从业者,JVM 皆为其在进阶之途上必然需要跨越的一道难关。不论是在日常的工作情境之中,还是在至关重要的面试环节里,JVM 均是不可或缺的必考之题。倘若对 JVM 缺...

阻塞模型将会使线程休眠,为什么 Java 线程状态却是 RUNNABLE?

使用 Java 阻塞 I/O 模型读取数据,将会导致线程阻塞,线程将会进入休眠,从而让出 CPU 的执行权,直到数据读取完成。这个期间如果使用 jstack 查看线程状态,却可以发现Java 线程状态...

JAVA并发之BlockingQueue(阻塞队列)

Java从JDK5开始在并发包内引入了BlockingQueue(阻塞队列),它除了提供队列的FIFO功能之外,还提供了额外的功能,例如:当获取队列内容时发现队列为空,则等待其变为非空。当往队列存储内...