# rabbitmq学习 **Repository Path**: carp-notes/rabbit-learn ## Basic Information - **Project Name**: rabbitmq学习 - **Description**: 学习RabbitMQ并记录笔记 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 1 - **Created**: 2022-01-13 - **Last Updated**: 2026-01-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 一、入门 ## 1、消息队列 ### 1)、MQ的相关概念 #### (1)、什么是MQ ​ MQ(Message Queue),从字面意思看,本质上是一个队列,遵循FIFO先入先出原则,只不过队列中存放的内容是Message而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物流解耦”的消息通信服务。使用了MQ后,消息发送上游只需要依赖MQ,不用依赖其他服务。 #### (2)、为什么要用MQ ##### (a)、流量消峰 ##### (b)、应用解耦 ##### (c)、异步处理 #### (3)、MQ的分类 ##### (a)、ActiveMQ ​ **优点**:单机吞吐量万级,时效性ms级,可用性高,基于主从架构高可用性,消息可靠性较低的概率丢失数据。 ​ **缺点**:官方社区对ActiveMQ5.x维护减少,高吞吐量场景使用较少。 ##### (b)、Kafka ​ 大数据杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。 ​ **优点**:性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。时效性ms级可用性非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方管理界面Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用。 ​ **缺点**:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,**社区更新较慢**; ##### (c)、RocketMQ ​ RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。 ​ **优点**:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失**,**MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ。 ​ **缺点**:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码 ##### (d)、RabbitMQ ​ 2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。 ​ **优点**:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高ttps://www.rabbitmq.com/news.html ​ **缺点**:商业版需要收费,学习成本较高 #### (4)、MQ的选择 ##### (a)、Kafka ​ Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生**大量数据**的互联网服务的数据收集业务。**大型公司**建议可以选用,如果有**日志采集**功能,肯定是首选 kafka 了。 ##### (b)、RocketMQ ​ 天生为**金融互联网**领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。 ##### (c)、RabbitMQ ​ 结合 erlang 语言本身的并发优势,性能好**时效性微秒级**,**社区活跃度也比较高**,管理界面用起来十分方便,如果你的**数据量没有那么大**,中小型公司优先选择功能比较完备的 RabbitMQ。 ## 2、RabbitMQ ### 1)、概念 ​ RabbitMQ是一个消息中间件,接收并转发消息。 ### 2)、四大核心概念 - **生产者** 产生数据并发送消息的程序 - **交换机** 交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定 - **队列** 队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。 - **消费者** 消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。 ![1641893086670](images/1641893086670.png) ### 3)、RabbitMQ核心部分 https://www.rabbitmq.com/getstarted.html ![RabbitMQ.png](images/RabbitMQ.png) ### 4)、名词介绍 - **Broker:表示消息队列服务器实体** RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。 - **Message :消息** - **Publisher :消息的生产者,也是一个向交换器发布消息的客户端应用程序** - **Exchange :交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。** Exchange有4种类型:**direct(默认),fanout, topic, 和headers**,不同类型的Exchange转发消息的策略有所区别 - **Queue 消息队列,用来保存消息直到发送给消费者。** 它是消息的容器,也是消息的终点。一个消息 可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 - **Binding 绑定,用于消息队列和交换器之间的关联。** 一个绑定就是基于路由键将交换器和消息队列连 接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。 - **Connection 网络连接,比如一个TCP连接。** - **Channel 信道,多路复用连接中的一条独立的双向数据流通道。** 信道是建立在真实的TCP连接内的虚 拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这 些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所 以引入了信道的概念,以复用一条 TCP 连接。 - **Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。** - **Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。** 虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有 自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定, RabbitMQ 默认的 vhost 是 / 。 ### 5)、安装(windows) #### (1)、安装Erlang ##### (a)、下载 官方下载地址:https://www.erlang.org/downloads 但是注意,需要根据rabbitmq的版本来,所以我这里下载的是`23.3.4.10`; 下载地址如下:(如果下载太慢,可以使用镜像网站进行下载,如:https://ghproxy.com/) https://github.com/erlang/otp/releases/download/OTP-23.3.4.10/otp_win64_23.3.4.10.exe ![1642076924907](images/1642076924907.png) ##### (b)、点击安装 这里是.exe文件,一直点下一步即可 ##### (c)、配置环境变量 配置`ERLANG_HOME` ![1642077402371](images/1642077402371.png) 配置Path ![1642077484856](images/1642077484856.png) #### (2)、安装RabbitMQ ##### (a)、下载 进入开始页面,点击下载(由于页面加载较慢,需要等待一会,才能看见下面的页面) ![1642077668691](images/1642077668691.png) 选择压缩包下载 ![1642077754819](images/1642077754819.png) ##### (b)、解压缩安装包到指定位置 ![1642078081286](images/1642078081286.png) ##### (c)、配置环境变量 **新增RABBITMQ_SERVER变量** ![1642078145445](images/1642078145445.png) **修改PATH变量** ![1642078246050](images/1642078246050.png) ##### (d)、启动管理插件 注意:启动插件时,rabbitmq服务必须是关闭的 cmd使用`rabbitmq-plugins.bat enable rabbitmq_management` 命令启动管理插件 ![1642078499347](images/1642078499347.png) ##### (e)、启动服务 cmd输入`rabbitmq-service.bat install`启动服务 **注意**:这里需要使用管理员权限使用cmd,否则可能出现如下异常 ![1642079055735](images/1642079055735.png) 安装成功 ![1642079177771](images/1642079177771.png) 启动服务(方式一) ![1642079288216](images/1642079288216.png) 启动服务(方式二) 使用管理员权限启动命令行,使用`rabbitmq-service start`启动服务 ![1642079372361](images/1642079372361.png) ##### (f)、查看状态 cmd输入`rabbitmqctl status`命令查看状态 ##### (g)、访问页面 地址:http://localhost:15672/,账号密码:guest/guest ![1642079444185](images/1642079444185.png) ![1642079485639](images/1642079485639.png) ### 6、安装(Centos7) #### (1)、安装Erlang ##### (a)、下载 官方下载地址:https://www.erlang.org/downloads 但是注意,需要根据rabbitmq的版本来,所以我这里下载的是`23.3.4.10`; ![1714027766180](images/1714027766180.png) ##### (b)、安装 解压,并进入目录 ``` tar -zxvf otp_src_23.3.4.10.tar.gz cd otp_src_23.3.4.10 ``` 安装依赖 ``` # 安装依赖 yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget socat gcc-c++ kernel-devel m4 mesa* freeglut* ``` configure到指定目录 ``` ./configure --prefix=/opt/erlang/erlang_23 ``` 可能会出现如下错误 ![1714036271288](images/1714036271288.png) ``` # 更新epel第三方软件库 yum install -y epel-release # 再次执行,就可以安装了 yum install –y wxWidgets-devel yum install wxBase #for /usr/bin/wx-config-3.0 cd /usr/bin # 创建软连接 ln -s wx-config-3.0 wx-config ``` 编译 ``` make && make install ``` ##### (c)、配置环境变量 配置`ERLANG_HOME、Path 更新配置 ``` source /etc/profile ``` ![1714038086462](images/1714038086462.png) ![1714038173712](images/1714038173712.png) #### (2)、安装RabbitMQ ##### (a)、下载 https://www.rabbitmq.com/docs/download 进入开始页面,点击下载(由于页面加载较慢,需要等待一会,才能看见下面的页面),选择压缩包下载 如果需要下载老版本可以去github上找,https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.9.12 ![1714053084964](images/1714053084964.png) ##### (b)、解压缩安装包到指定位置 ``` tar -xvf rabbitmq-server-generic-unix-3.9.12.tar.xz ``` ##### (c)、配置环境变量 **新增RABBITMQ_SERVER变量** **修改PATH变量** ![1714053250645](images/1714053250645.png) ##### (d)、启动管理插件 注意:启动插件时,rabbitmq服务必须是关闭的 `rabbitmq-plugins enable rabbitmq_management ` 命令启动管理插件 ![1714053475562](images/1714053475562.png) ##### (e)、启动服务 rabbitmq-server start ``` # 启动MQ rabbitmq-server start # 要后台启动可以使用 rabbitmq-server -detached # 关闭MQ rabbitmqctl stop # 查看状态 rabbitmqctl status rabbitmqctl list_users // 列出当前系统的用户 rabbitmqctl list_user_permissions [用户名] // 查看用户权限 rabbitmqctl delete_user [用户名] // 删除用户 rabbitmqctl change_password [用户名] [密码] // 修改用户密码 rabbitmq-plugins enable rabbitmq_management // 开启web远程管理界面 rabbitmq-plugins disable rabbitmq_management // 关闭web远程管理界面 systemctl start rabbitmq-server // 启动rabbitmq-server服务 systemctl stop rabbitmq-server // 停止rabbitmq-server服务 systemctl restart rabbitmq-server // 重启rabbitmq-server服务 systemctl reload rabbitmq-server // 重载rabbitmq-server服务 (推荐使用) systemctl status rabbitmq-server // 查看rabbitmq-server状态 systemctl enable rabbitmq-server // 设置为开机启动rabbitmq-server服务 ``` ![1714056910936](images/1714056910936.png) ##### (f)、新增用户 执行下面的命令创建一个用户 rabbitmqctl add_user 用户名 密码 执行下面的命令设置用户为超级管理员。 rabbitmqctl set_user_tags 用户名 administrator 设置 admin 用户的权限,指定允许访问的vhost以及write/read rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*" 成功登录 ![1714057074686](images/1714057074686.png) # 二、核心 ## 1、Hello world 官网地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.html 在本教程的这一部分中,我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印 出来的消费者。我们将介绍 Java API 中的一些细节。 在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代 表使用者保留的消息缓冲区 ![(images/python-one.png) -> [|||] -> (C)](https://www.rabbitmq.com/img/tutorials/python-one.png) ### 1)、依赖 ```xml com.rabbitmq amqp-client 5.9.0 commons-io commons-io 2.8.0 ``` ### 2)、生产者 1. 创建连接 - 创建连接工厂 - 配置连接信息 - 创建连接 2. 创建队列 - 获得信道 - 创建队列 3. 发送消息 - 消息文本 - 发送消息 4. 关闭连接 示例:[生产者示例代码](01-hello/src/main/java/com/liyu/rabbit/Producer.java) ### 3)、消费者 1. 创建连接 - 创建连接工厂 - 配置连接信息 - 创建连接 2. 监听消息队列 - 创建信道 - 消费消息 示例:[消费者示例代码](01-hello/src/main/java/com/liyu/rabbit/Consumer.java) 注意:消费者不需要关闭连接 ### 4)、效果 生产者生产消息 ![1646277912242](images/1646277912242.png) 消费者消费消息 ![1646277929986](images/1646277929986.png) ## 2、Work Queues 工作队列(又名:任务队列)是为了避免立即执行一项资源密集型任务,而不得不等待它完成。相反,我们将任务安排在以后完成。我们把任务封装成一个消息发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当运行许多工作线程时,任务将在它们之间共享。 ### 1)、循环调度 在这个案例中我们会启动两个工作线程(消费者),一个消息发送线程(生产者),我们来看看他们两个工作线程 是如何工作的。 #### (1)、工厂类 在1中,我们发现创建信道(Channel)的这部分代码在生产者和消费者中是重复的,因此可以创建一个工厂类来创建信道。 示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/factory/RabbitFactory.java) #### (2)、消费者 在消费者中,我们利用线程池,创建两个消费者线程,去消费消息 示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/roll/Consumer.java) #### (3)、生产者 在生产者中,我们通过控制台,重复输入消息 示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/Producer.java) #### (4)、效果 我们先再生产者这边发送几条消息,再去消费者查看消费情况,发现消费线程是轮询进行消费的 ![1646310172972](images/1646310172972.png) ![1646310227212](images/1646310227212.png) ### 2)、消息确认 #### (1)、概念 消费者接收并处理完一条消息需要一定的时间,在这个过程中,如果消费者还没处理完,那么就会发生消息丢失。 为了保证消息在消费过程中不丢失,rabbitmq引入了消息应答机制:消费者在接收到消息并处理完消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。 #### (2)、自动应答 消息发送后即被认为已经传输成功,这种模式吞吐量较高,但是容易发生消息丢失,也有可能导致消息者消费过载,接收到太多来不及处理的消息,最终导致消息积压,内存耗尽。 #### (3)、消息应答的方法 `Channel.basicAck`:肯定确定 `Channel.basicNack`:否定确认 `Channel.basicReject`: 否定确认,和`Channel.basicNack`的区别:缺少`multiple`批量应答参数 #### (4)、修改消费者 - 修改消费`Channel.basicConsume`方法的`autoAck`参数未`false` - 使用`basicNack`方法进行否认确认或`basicAck`肯定确认应答 示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/roll/ManualConsumer.java) #### (5)、效果 消费失败的消息会重新进入队列,由其他队列消费 ![1652619127883](images/1652619127883.png) #### (6)、注意 不要忘记应答 思考:如果在生产者发消息时,MQ挂了,这个时候生产者不知道,怎么办?具体请看发布订阅模式 ### 3)、消息持久化 #### (1)、修改生产者 - `channel.queueDeclare`的`durable`参数修改为`true`,使队列持久化 - `channel.basicPublish`的props参数使用`MessageProperties.PERSISTENT_TEXT_PLAIN`,表示持久化消息 示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/Producer.java) 注意:这种错误表示,要先删除原有的队列 ![1652619597312](images/1652619597312.png) 出现D,表示是持久化的 ![1652619699352](images/1652619699352.png) #### (2)、效果 生产者生产几条消息,我们重启rabbit服务,看看消息还在不在 ![1652620062232](images/1652620062232.png) 查看时,消息仍在 ![1652625332066](images/1652625332066.png) ### 4)、公平调度 #### (1)、概念 ​ 假如有两个消费者,其中一个消费者处理很慢,一个处理很快,这个时候,我们采用循环调度的方式,这是**公平调度**,但是我们希望处理更快的消费者能处理更多的消息,这就是**不公平调度** ![1652704164198](images/1652704164198.png) #### (2)、处理 `Channel.basicQos`方法可以设置消费者同时最多可以接收处理多少条消息,如果是1,那么在它确认应答之前,不向它发送新消息,而把它分配给其他可以处理的消费者 #### (3)、示例 示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/roll/FairConsumer.java) 创建两个消费者,一个处理消息要阻塞5秒,另一个直接处理,发送5条消息,查看效果 理想效果:阻塞的消费者只处理一条消息,其他消息都被另一个消费者处理 ![1652703714918](images/1652703714918.png) #### (4)、思考 ​ 假设现在有两个消费者A、B,他们处理速度差不多,这个时候我们给A的qos一个设置成1,B设成2;这种情况下,如果同时出现大量请求,会有什么效果。 ​ B处理的请求大概是A的2倍。 ## 3、发布订阅 向多个消费者传递一条消息,这种模式叫发布订阅。生产者发送一条消息时,mq向多个消费者(不一定是全部)进行广播。 ### 1)、交换机 #### (1)、简介 ​ 回顾之前的内容,我们可以发现 - **生产者**——发送消息 - **消息队列**——存储消息 - **消费者**——接收消息 在这里,引入一个新的概念——**交换机**,接收生产者的消息并分发给消息队列 #### (2)、交换机的类型 - **`direct`**——直连交换机 根据消息携带的路由键(routing key)将消息投递给对应队列的。 - **`topic`**——主题交换机 队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列。 - **`headers`** ——头交换机 头交换机使用多个消息头属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。 - **`fanout`**——扇型交换机 将消息路由给绑定到它身上的所有队列。 除此之外还有两种特殊的交换机 - **`default`**——默认交换机 实际上是一个由RabbitMQ预先声明好的名字为空字符串的直连交换机 - **`Dead Letter `**——死信交换机 当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人工干预。 ### 2)、扇出(fanout)简单实例 将消息路由给绑定到它身上的所有队列。 #### (1)、生产者 示例:[示例代码](03-publish-subscribe/src/main/java/com/liyu/rabbit/Producer.java) - 初始参数 ```java // 队列名称 public static final String QUEUE_NAME = "hello"; public static final String QUEUE_NAME2 = "hello2"; // 交换机名称 public static final String EXCHANGE_NAME = "ex_hello"; ``` - 获得通道 - 声明并创建队列 ```java channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueDeclare(QUEUE_NAME2,true, false, false, null); ``` - 设置交换机名称和类型 ```java channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); ``` - 绑定消息队列 ```java // 绑定交接机-路由-消息队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "rk_" +QUEUE_NAME); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "rk_" +QUEUE_NAME2); ``` - 发布消息 ```java // 发送消息 这里要设置交换机名称,但是不设置路由 // MessageProperties.PERSISTENT_TEXT_PLAIN:表示持久化 channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8)); ``` #### (2)、效果 使用生产者发送一条消息 ![1652712067283](images/1652712067283.png) **理想效果**:hello和hello2俩个消息队列都有该消息 **实际效果**:hello和hello2俩个消息队列都有该消息,实际如下 hello队列: ![1652711910116](images/1652711910116.png) hello2队列 ![1652712017322](images/1652712017322.png) ### 3)、发布确认 使用消息代理(如 RabbitMQ)的系统按照定义是分布式的。由于发送的协议方法(消息)不能保证到达对等方或被对等方成功处理,因此发布者和消费者都需要一种用于传递和处理确认的机制。 https://www.rabbitmq.com/confirms.html ##### 消费者交付确认 当 RabbitMQ 向消费者传递消息时,它需要知道何时认为消息发送成功。在[2.2.2【消息确认】](#2)、消息确认)中已经接触过; ##### 传递标签 在我们继续讨论其他主题之前,重要的是解释如何识别交付(并且确认表明它们各自的交付)。当注册消费者(订阅)时,RabbitMQ 将使用`basic.deliver` 方法传递(推送)消息。该方法带有一个传递标签`delivery tag`,它唯一地标识一个通道上的传递。因此,交付标签的范围是每个渠道。 交付标签是单调增长的正整数,并由客户端库呈现。确认交付的客户端库方法将交付标签作为参数 ##### 生产者发布确认 https://www.rabbitmq.com/confirms.html#publisher-confirms 生产者发送消息到消息队列的过程中,可能因为网络,或其他原因,导致消息持久化失败,但是这个时候生产者并不知道,所以发生了消息丢失。 保证消息不丢失的唯一方法是使用事务——使通道具有事务性,然后为每条消息或一组消息发布、提交。 #### (1)、单个发布确认 示例:[示例代码](03-publish-subscribe/src/main/java/com/liyu/rabbit/PublisherConfirm.java)中的`singleConfirm`方法 步骤如下: - 创建信道 - 开启发布确认 ```java channel.confirmSelect(); ``` - 创建队列 - 发送单条消息 - 获得等待确认结果 ```java boolean b = channel.waitForConfirms(); ``` #### (2)、批量发布确认 示例:[示例代码](03-publish-subscribe/src/main/java/com/liyu/rabbit/PublisherConfirm.java)中的`batchConfirm`方法 步骤如下: - 创建信道 - 开启发布确认 - 创建队列 - 发送多条消息 - 获得等待确认结果 #### (3)、异步发布确认消息 示例:[示例代码](03-publish-subscribe/src/main/java/com/liyu/rabbit/PublisherConfirm.java)中的`asyncConfirm`方法 步骤如下: - 创建信道 - 开启发布确认 - 创建队列 - 添加发布确认监听器 ```java ConfirmCallback ackCallback = (deliveryTag, multiple) ->{ // System.out.println("消息发送失败:" +deliveryTag); }; ConfirmCallback nackCallback = (deliveryTag, multiple) ->{ System.out.println("消息发送失败:" +deliveryTag); }; channel.addConfirmListener( ackCallback, nackCallback); ``` - 发送消息 #### (4)、三种方式发送1000条消息的效率比较 ![1653204104079](images/1653204104079.png) ## 4、路由 在发布订阅模式中,我们绑定队列和交换机时,设置了一个routingKey参数,它用来表示交换机和队列的关系。 ![1653206837798](images/1653206837798.png) ### 1)、Direct直连交换机 交换机将消息分发给完全匹配的队列。 示例:[示例代码](04-routing-key/src/main/java/com/liyu/rabbit/producer/DirectProducer.java) ##### 步骤: - 创建信道 - 开启发布确认 - 添加发布确认监听 - 声明直连交换机 - 声明队列 - 建立交换机和队列路由关系 - 根据路由发送消息 ##### 效果: ![1653224525686](images/1653224525686.png) ![1653224540072](images/1653224540072.png) ### 2)、Topic主题交换机 #### 简介: 主题交换机的路由必须由单词列表和"."号组成,如“hello.world”,这些单词可以是任何字符,但是一般使用和消息相关的一些特性。主题的交换机和直接交换机类似,都是使用路由匹配发送到指定队列。但是主题有两个特殊情况 - `*`(星号)可以只替换一个单词。 - `#` (hash) 可以代替零个或多个单词。 如:在以下示例中, ![img](images/python-five.png) | 路由key | 匹配路由 | 匹配队列 | | --------------- | -------------------------- | -------- | | a.orange.b | \*.orange.\* | Q1 | | a.orange | \ | 无 | | orange.b | \ | 无 | | a.b.rabbit | \*.\*.rabbit | Q2 | | a.b.rabbit.c | \ | 无 | | a.orange.rabbit | \*.orange.\*、\*.\*.rabbit | Q1、Q2 | | lazy | lazy.\# | Q2 | | lazy.a | lazy.\# | Q2 | | lazy.a.b.c | lazy.\# | Q2 | | lazy.orange.a | lazy.\#、\*.orange.\* | Q1、Q2 | | a.lazy.b | \ | 无 | | a.lazy.rabbit | \*.\*.rabbit | Q2 | #### (1)、示例 ##### 生产者: - 创建连接、创建信道 - 开启发布确认、添加发布确实监听 - 声明交换机、声明队列 - 绑定交换机和队列 - 发送消息 示例:[示例代码](04-routing-key/src/main/java/com/liyu/rabbit/producer/TopicProducer.java) ##### 消费者: - 创建连接、创建信道 - 实现消费消息代码 示例:[示例代码](04-routing-key/src/main/java/com/liyu/rabbit/consumer/Consumer.java) #### (2)、效果 ![1655825533252](images/1655825533252.png) ![1655825573070](images/1655825573070.png) ### 3)、死信队列 产生死信的条件,官方文档:https://www.rabbitmq.com/dlx.html - 消费者进行否定确认,并将参数requeue设置为false; - 消息超过设定的ttl有效时间 - 消息超过队列最大可用长度 #### (1)、生产者 - 创建信道、开启发布确认 - 声明普通交换机 - 绑定普通交换机和队列 - 创建消息序列化参数 expiration表示ttl有效时间 ```java BasicProperties basicProperties = new BasicProperties("text/plain", null, null, 2, 0, null, null, // 到期时间:10s // "10000", null, null, null, null, null, null, null); ``` - 发送消息 ```java channel.basicPublish(DlxConsumer.NORMAL_HELLO_EXCHANGE,DlxConsumer.NORMAL_HELLO_QUEUE, basicProperties, msg.getBytes(StandardCharsets.UTF_8)); ``` #### (2)、消费者 - 创建信道 - 声明普通队列 使用arguments绑定死信队列、死信路由、设置队列最大长度等 ```java /* start----------------------------------- 普通正常队列 ----------------------------- */ Map arguments = new HashMap<>(); // 死信交换机 arguments.put("x-dead-letter-exchange", DLX_HELLO_EXCHANGE); // 死信路由key arguments.put( "x-dead-letter-routing-key" , DLX_HELLO_QUEUE ); // 设置队列最大长度 arguments.put("x-max-length", 2); // 过期时间:建议生产者设置 // arguments.put( "x-message-ttl" , 1000 ); channel.queueDeclare(NORMAL_HELLO_QUEUE, false, false, false, arguments); channel.queueBind(NORMAL_HELLO_QUEUE, NORMAL_HELLO_EXCHANGE, NORMAL_HELLO_QUEUE); ``` - 普通消费者监听消费 注意:测试过期时间、最大可用长度时,我们不消费队列 ```java // 取消消费逻辑,保证消息存在于队列中不被消费 DeliverCallback callback = (consumerTag, message) -> { System.out.println("普通队列"+NORMAL_HELLO_QUEUE+"拒绝消费了一条消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicNack(message.getEnvelope().getDeliveryTag(), false, false); }; channel.basicConsume(NORMAL_HELLO_QUEUE, false, callback, consumerTag -> {}); ``` #### (3)、死信队列和交换机 - 创建信道 - 声明死信队列 - 声明死信交换机 - 绑定死信队列和交换机 - 死信队列消费逻辑 ```java /* start--------------------------------- 死信队列 ---------------------------------- */ // 这里需要死信交换机 channel.exchangeDeclare(DLX_HELLO_EXCHANGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DLX_HELLO_QUEUE, false, false, false, null); channel.queueBind(DlxConsumer.DLX_HELLO_QUEUE, DLX_HELLO_EXCHANGE, DLX_HELLO_QUEUE); DeliverCallback dlxcallback = (consumerTag, message) -> { System.out.println("死信队列"+ DlxConsumer.DLX_HELLO_QUEUE+"消费了一条被溢出的消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(DlxConsumer.DLX_HELLO_QUEUE, true, dlxcallback, consumerTag -> {}); /* end--------------------------------- 死信队列 ---------------------------------- */ ``` #### (4)、示例代码 生产者:[示例代码](04-routing-key/src/main/java/com/liyu/rabbit/consumer/DlxConsumer.java) 消费者(包括死信部分):[示例代码](04-routing-key/src/main/java/com/liyu/rabbit/producer/DlxProducer.java) ## 5、延迟队列 ### 1)、利用死信队列+ttl(不推荐) 在2.4.3中, 我们知道,ttl消息过期可以触发死信队列,如果ttl的消息没有消费者,那么在到达过期时间后,会自动进入死信队列,这个可以实现延迟队列; ttl不推荐的原因是,队列按照先进先出原则,只会判断第一条消息会不会过期,这就会导致后面的消息可能没及时过期进入死信队列 ### 2)、插件实现 #### (1)、安装插件 rabbitmq插件基础知识:https://www.rabbitmq.com/plugins.html rabbitmq社区插件中心地址:https://www.rabbitmq.com/community-plugins.html 我们在这里可以找到延迟插件`rabbitmq_delayed_message_exchange` ![1656841095341](images/1656841095341.png) 下载ez文件,并保存到plugins目录下 ![1656841270412](images/1656841270412.png) 启用插件 ```shell rabbitmq-plugins enable rabbitmq_delayed_message_exchange ``` 然后重启服务,见到下面的类型则安装成功 ![1656841345720](images/1656841345720.png) #### (2)、示例 ##### 生产者:[示例代码](05-delayed-message/src/main/java/com/liyu/rabbit/producer/Producer.java) ![1656841635287](images/1656841635287.png) ##### **消费者:**[示例代码](05-delayed-message/src/main/java/com/liyu/rabbit/consumer/Consumer.java) #### (3)、效果 查看下面生产者和消费的时间,是否延迟了10秒 ![1656841861859](images/1656841861859.png) ![1656841894308](images/1656841894308.png) # 三、整合springboot ## 1、Hello world ### 1)、依赖 ```xml org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web ``` ### 2)、yaml ```yaml server: port: 8000 spring: application: name: rabbit-boot rabbitmq: username: guest password: guest host: localhost port: 5672 ``` ### 3)、组件 分成三步 1. 创建交换机 2. 创建队列 3. 绑定队列到交换机 [示例代码](06-springboot/src/main/java/com/liyu/config/DirectComment.java) ```java public static final String DIRECT_EXCHANGE = "direct.ex.hello"; public static final String DIRECT_QUEUE = "direct.queue.hello"; public static final String DIRECT_RK = "direct.rk.hello"; /** * 声明交换机 * @return 交换机 */ @Bean("directExchange") public Exchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); } /** * 声明队列 * @return 队列 */ @Bean("directQueue") public Queue directQueue() { return new Queue(DIRECT_QUEUE); } /** * 绑定队列和交换机 * @param directQueue 队列 * @param directExchange 交换机 * @return 绑定关系 */ @Bean("directBinding") public Binding directBinding(Queue directQueue, Exchange directExchange) { return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_RK).and(Collections.emptyMap()); } ``` ### 4)、生产者-发送消息 我们写一个Controller,使用rabbitTemplate发送消息 [示例代码](06-springboot/src/main/java/com/liyu/controller/DirectController.java) ```java @Resource private RabbitTemplate rabbitTemplate; @GetMapping("{msg}") public void send(@PathVariable("msg") String msg) { rabbitTemplate.convertAndSend(DirectComment.DIRECT_EXCHANGE, DirectComment.DIRECT_RK, msg); log.info("消息发送成功,交换机:{},路由:{},消息:{}", DirectComment.DIRECT_EXCHANGE, DirectComment.DIRECT_RK, msg); } ``` ### 5)、消费者-监听队列 [示例代码](06-springboot/src/main/java/com/liyu/listener/DirectConsumer.java) ```java @RabbitListener(queues = DirectComment.DIRECT_QUEUE) public void consume(Message message, Channel channel) throws IOException { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("队列{}消费了一条消息:{}", DirectComment.DIRECT_QUEUE, msg); // 手动应答,注意默认是自动应答,所以这里不需要设置手动应答 // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } ``` ### 6)、效果 通过url请求controller发送消息,查看日志 ![1657115669598](images/1657115669598.png) ## 2、死信队列 ### 1)、TTL、最大队列长度 [DlxComment.java(组件)](06-springboot/src/main/java/com/liyu/config/DlxComment.java) [DlxController.java(生产者)](06-springboot/src/main/java/com/liyu/controller/DlxController.java) [DlxConsumer.java(消费者)](06-springboot/src/main/java/com/liyu/listener/DlxConsumer.java) #### (1)、普通队列组件 分成三步 1. 创建交换机 2. 创建队列 3. 绑定队列到交换机 - **普通交换机** ```java @Bean("normalExchange") public Exchange normalExchange() { return new ExchangeBuilder(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT.getType()).build(); } ``` - **普通队列** 这里绑定死信交换机和死信队列;也可以在这里设置最大长度 ```java /** * 普通队列 */ @Bean("normalQueue") public Queue normalQueue() { // 注意:绑定死信队列和死信交换机 return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DLX_EXCHANGE).deadLetterRoutingKey(DLX_RK).build(); } ``` ```java /** * 普通队列(最大队列长度) */ @Bean("normalMaxLengthQueue") public Queue normalMaxLengthQueue() { // 注意:绑定死信队列和死信交换机,并设置最大队列长度 return QueueBuilder.durable(NORMAL_MAXLENGTH_QUEUE).deadLetterExchange(DLX_EXCHANGE).deadLetterRoutingKey(DLX_RK).maxLength(5).build(); } ``` - **绑定普通队列到普通交换机** ```java /** * 绑定普通队列到普通交换机 */ @Bean("normalBinding") public Binding normalBinding(Exchange normalExchange, Queue normalQueue) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_RK).noargs(); } ``` ```java /** * 绑定普通队列(最大队列长度)到普通交换机 * @param normalExchange * @param normalMaxLengthQueue * @return */ @Bean("normalMaxLengthBinding") public Binding normalMaxLengthBinding(Exchange normalExchange, Queue normalMaxLengthQueue) { return BindingBuilder.bind(normalMaxLengthQueue).to(normalExchange).with(NORMAL_MAXLENGTH_RK).noargs(); } ``` #### (2)、死信队列组件 和上面一样分三步 分成三步 1. 创建交换机 2. 创建队列 3. 绑定队列到交换机 ```java /** * 死信交换机 */ @Bean("dlxExchange") public Exchange dlxExchange(){ return new ExchangeBuilder(DLX_EXCHANGE, BuiltinExchangeType.DIRECT.getType()).build(); } /** * 死信队列 */ @Bean("dlxQueue") public Queue dlxQueue() { return QueueBuilder.durable(DLX_QUEUE).build(); } /** * 绑定死信队列到死信交换机 */ @Bean("dlxBinding") public Binding dlxBinding(Exchange dlxExchange, Queue dlxQueue) { return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_RK).noargs(); } ``` #### (3)、普通队列监听(消费) 这里要测试死信队列,如果普通队列消费了,就不是那么直观,所以这里不配置普通队列的消费者 #### (4)、死信队列监听(消费) ```java @RabbitListener(queues = DlxComment.DLX_QUEUE) public void dlxConsume(Message message, Channel channel) throws IOException { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("死信队列{}在{}消费了一条消息:{}", DlxComment.DLX_QUEUE, DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), msg); } ``` #### (5)、Controller(生产) ttl测试生产消息 ```java @GetMapping("{msg}") public void send(@PathVariable("msg") String msg) { rabbitTemplate.convertAndSend(DlxComment.NORMAL_EXCHANGE, DlxComment.NORMAL_RK, msg, message -> { MessageProperties messageProperties = message.getMessageProperties(); // 设置十秒过期时间 messageProperties.setExpiration("10000"); return message; }); log.info("消息发送成功,交换机:{},路由:{},时间:{},消息:{}", DlxComment.NORMAL_EXCHANGE, DlxComment.NORMAL_RK, DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), msg); } ``` 超过最大队列长度的消费者 ``` @GetMapping("maxlength/{msg}") public void sendMaxLength(@PathVariable("msg") String msg) { for (int i = 1; i <= 10; i ++ ) { rabbitTemplate.convertAndSend(DlxComment.NORMAL_EXCHANGE, DlxComment.NORMAL_MAXLENGTH_RK, msg+i, message -> message); log.info("消息发送成功,交换机:{},路由:{},时间:{},消息:{}", DlxComment.NORMAL_EXCHANGE, DlxComment.NORMAL_MAXLENGTH_RK, DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), msg + 1); } } ``` #### (6)、效果 ttl效果,可以看到,正好10秒后,死信队列消费到了消息 ![1657719768786](images/1657719768786.png) ![1657719817280](images/1657719817280.png) 超过最大队列长度测试,我们发送了10条消息,正好有5条进入死信队列 ![1657720258479](images/1657720258479.png) ![1657720244556](images/1657720244556.png) ### 2)、否定确定 在TTL的基础上,加上普通队列消费者 ```java /** * 普通队列消费者 */ @RabbitListener(queues = DlxComment.NORMAL_QUEUE) public void normalConsume(Message message, Channel channel) throws IOException { String msg = new String(message.getBody(), StandardCharsets.UTF_8); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); log.info("普通队列{}在{}否定应答了一条消息:{}", DlxComment.NORMAL_QUEUE, DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), msg); } ``` 并配置开启手动应答 ![1657721338202](images/1657721338202.png) 效果 ![1657721467082](images/1657721467082.png) ## 3、延迟队列 #### (1)、组件 - 交换机 用`delayed()`方法设置这是一个延迟队列 ```java @Bean("delayedExchange") public Exchange delayedExchange() { return new ExchangeBuilder(DELAYED_EXCHANGE, BuiltinExchangeType.DIRECT.getType()).delayed().build(); } ``` - 消息队列 ```java @Bean("delayedQueue") public Queue delayedQueue() { return QueueBuilder.durable(DELAYED_QUEUE).build(); } ``` - 绑定关系 ```java public Binding delayedBinding(Queue delayedQueue, Exchange delayedExchange) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } ``` #### (2)、生产者Controller ```java @GetMapping("{msg}/{delay}") public void send(@PathVariable("msg") String msg, @PathVariable(value = "delay", required = false) Integer delay) { delay = ObjectUtils.isEmpty(delay) ? 10000 : delay; Integer finalDelay = delay; rabbitTemplate.convertAndSend(DelayedComment.DELAYED_EXCHANGE, DelayedComment.DELAYED_ROUTING_KEY, msg, message -> { MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setDelay(finalDelay); return message; }); log.info("消息发送成功,时间:{},延迟:{},消息:{},交换机:{},路由:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), finalDelay, msg, DelayedComment.DELAYED_EXCHANGE, DelayedComment.DELAYED_EXCHANGE ); } ``` #### (3)、消费者监听器 ```java @RabbitListener(queues = DelayedComment.DELAYED_QUEUE) public void consume(Message message, Channel channel) throws IOException { String msg = new String(message.getBody(), StandardCharsets.UTF_8); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("延迟队列{}于{}消费了一条消息:{}", message.getMessageProperties().getConsumerQueue(), DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), msg); } ``` #### (4)、效果 ![1658064463514](images/1658064463514.png) ## 4、发布确认 ### 1)、交换机发布确认 #### (1)、配置开启发布确认 ```yaml server: port: 8000 spring: application: name: rabbit-boot-publish rabbitmq: username: guest password: guest host: localhost port: 5672 listener: simple: acknowledge-mode: manual # 发布确认模式 # correlated:使用CorrelationData将确认与发送的消息关联起来。 # simple:在作用域内使用RabbitTemplate# waitforconfirmation()(或waitForConfirmsOrDie())。 # none:发布者确认被禁用(默认)。 publisher-confirm-type: correlated ``` #### (2)、发布消息,并添加发布确认 ```java @Resource private RabbitTemplate rabbitTemplate; // 添加监听 @PostConstruct private void init() { rabbitTemplate.setConfirmCallback((@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause)->{ String id = ObjectUtils.isEmpty(correlationData) ? "" : correlationData.getId(); if (ack) { log.info("消息发送成功,id={}",id); } else { log.warn("消息发送失败,id={},\n异常信息={}", id, cause); } }); } @GetMapping("{id}/{msg}") public void sendNoExchange(@PathVariable("id") String id, @PathVariable("msg") String msg) { // 设置correlationData,发布确认时,我们通过这个匹配信息 CorrelationData correlationData = new CorrelationData(); correlationData.setId(id); rabbitTemplate.convertAndSend("ex.no", "rk.no", msg, message -> { MessageProperties messageProperties = message.getMessageProperties(); // 设置十秒过期时间 messageProperties.setExpiration("10000"); return message; }, correlationData); } ``` #### (3)、效果 **交换机不存在时:** 发送请求: ![1658241100039](images/1658241100039.png) 效果: ![1658241146068](images/1658241146068.png) **消息队列不存在时** ![1658242504866](images/1658242504866.png) ![1658241992852](images/1658241992852.png) 发现如果交换机存在,但是队列不存在,这时候提示发送成功,这个时候发生了消息丢失 ### 2)、消息回退 在上一个部分中,交换机无法通过路由找到交换机时,消息丢失;我们可以设置消息回退防止消息丢失; #### (1)、yaml配置 在上例的配置基础上添加`spring.rabbitmq.publisher-returns= true` ![1658495997053](images/1658495997053.png) #### (2)、发送消息时,添加回退监听 ![1658496053589](images/1658496053589.png) #### (3)、效果 ![1658496098039](images/1658496098039.png) ​ ### 3)、备份交换机 概述:在前面,我们因为没有绑定队列或没有匹配的队列,导致消息发送失败,这个时候我们可以设置备份队列,通过备份交换机和队列重新发送消息。甚至可以添加异常消息监听。 #### (1)、yaml 同上 #### (2)、组件 ```java public static final String NORMAL_EXCHANGE = "normal.ex.hello"; public static final String HELLO_RK = "rk.hello"; public static final String ALTERNATE_EXCHANGE = "alternate.exhange.hello"; public static final String ALTERNATE_QUEUE = "alternate.queue.hello"; public static final String WARNING_RK = "#"; public static final String WARNING_QUEUE = "warning.queue.hello"; @Bean("normalExchange") public Exchange normalExchange() { return new ExchangeBuilder(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT.getType()).alternate(ALTERNATE_EXCHANGE).build(); } @Bean("alternateExchange") public Exchange alternateExchange() { return new ExchangeBuilder(ALTERNATE_EXCHANGE, BuiltinExchangeType.TOPIC.getType()).build(); } @Bean("alternateQueue") public Queue alternateQueue(){ return QueueBuilder.durable(ALTERNATE_QUEUE).build(); } @Bean("warningQueue") public Queue warningQueue() { return QueueBuilder.durable(WARNING_QUEUE).build(); } @Bean("alternateBinding") public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) { return BindingBuilder.bind(alternateQueue).to(alternateExchange).with(HELLO_RK).noargs(); } @Bean("warnBinding") public Binding warnBinding(Exchange alternateExchange, Queue warningQueue) { return BindingBuilder.bind(warningQueue).to(alternateExchange).with(WARNING_RK).noargs(); } ``` #### (3)、发送消息 ```java /** * 备份交换机测试 */ @GetMapping("alternate/{id}/{msg}") public void alternate(@PathVariable("id") String id, @PathVariable("msg") String msg) { // 设置correlationData,发布确认时,我们通过这个匹配信息 CorrelationData correlationData = new CorrelationData(); correlationData.setId(id); rabbitTemplate.convertAndSend(AlternateExchangeCommon.NORMAL_EXCHANGE, AlternateExchangeCommon.HELLO_RK, msg, message -> message, correlationData); } ``` #### (4)、消息监听 ```java @RabbitListener(queues = AlternateExchangeCommon.ALTERNATE_QUEUE) public void alterQueueListener(Channel channel, Message message) throws IOException { String msg = new String(message.getBody(), StandardCharsets.UTF_8); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("备份队列接收到信息:" + msg); } @RabbitListener(queues = AlternateExchangeCommon.WARNING_QUEUE) public void warnQueueListener(Channel channel, Message message) throws IOException { String msg = new String(message.getBody(), StandardCharsets.UTF_8); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("异常队列接收到信息:" + msg); } ``` #### (5)、效果 ![1658674832799](images/1658674832799.png) ### 4)、优先队列 - 声明优先队列 设置最大优先值`x-max-priority`,最大225,建议1-10 - 发送消息时设置优先值`x-priority`,值越大,优先级越高 #### (1)、组件 ```java public static final String PRIORITY_EXCHANGE = "priority.exchange.hello"; public static final String PRIORITY_QUEUE = "priority.queue.hello"; public static final String PRIORITY_ROUTING_KEY = "priority.rk.hello"; @Bean("priorityExchange") public Exchange priorityExchange() { return new ExchangeBuilder(PRIORITY_EXCHANGE, BuiltinExchangeType.DIRECT.getType()).build(); } @Bean("priorityQueue") public Queue priorityQueue() { return QueueBuilder.durable(PRIORITY_QUEUE).maxPriority(10).build(); } @Bean("priorityBinding") public Binding priorityBinding(Exchange priorityExchange, Queue priorityQueue) { return BindingBuilder.bind(priorityQueue).to(priorityExchange).with(PRIORITY_ROUTING_KEY).noargs(); } ``` #### (2)、发送消息 ```java /** * 优先级 */ @GetMapping("priority") public void priority() { sendPriority(1, "消息A"); sendPriority(2, "消息B"); sendPriority(3, "消息C"); sendPriority(4, "消息D"); sendPriority(5, "消息E"); } private void sendPriority(int priority, String msg) { rabbitTemplate.convertAndSend(PriorityExchangeCommon.PRIORITY_EXCHANGE, PriorityExchangeCommon.PRIORITY_ROUTING_KEY, msg, message -> { MessageProperties properties = message.getMessageProperties(); properties.setPriority(priority); return message; }); log.info("成功发送消息{},优先级:{}", msg, priority); } ``` #### (3)、消息监听 注意:等消息发送完成后,再添加消息监听,否则因为消息发送就被成功消费,是看不到效果的 ```java @RabbitListener(queues = PriorityExchangeCommon.PRIORITY_QUEUE) public void priorityListener(Channel channel, Message message) throws IOException { log.info("消费消息:{}", new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } ``` #### (4)、效果 根据设置的消息优先级,消费的消息顺序应该是EDCBA ![1659851901463](images/1659851901463.png)