# rabbitmq学习
**Repository Path**: carp-notes/rabbit-learn
## Basic Information
- **Project Name**: rabbitmq学习
- **Description**: 学习RabbitMQ并记录笔记
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 1
- **Created**: 2022-01-13
- **Last Updated**: 2026-01-08
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 一、入门
## 1、消息队列
### 1)、MQ的相关概念
#### (1)、什么是MQ
MQ(Message Queue),从字面意思看,本质上是一个队列,遵循FIFO先入先出原则,只不过队列中存放的内容是Message而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物流解耦”的消息通信服务。使用了MQ后,消息发送上游只需要依赖MQ,不用依赖其他服务。
#### (2)、为什么要用MQ
##### (a)、流量消峰
##### (b)、应用解耦
##### (c)、异步处理
#### (3)、MQ的分类
##### (a)、ActiveMQ
**优点**:单机吞吐量万级,时效性ms级,可用性高,基于主从架构高可用性,消息可靠性较低的概率丢失数据。
**缺点**:官方社区对ActiveMQ5.x维护减少,高吞吐量场景使用较少。
##### (b)、Kafka
大数据杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。
**优点**:性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。时效性ms级可用性非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方管理界面Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用。
**缺点**:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,**社区更新较慢**;
##### (c)、RocketMQ
RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。
**优点**:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失**,**MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ。
**缺点**:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
##### (d)、RabbitMQ
2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
**优点**:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高ttps://www.rabbitmq.com/news.html
**缺点**:商业版需要收费,学习成本较高
#### (4)、MQ的选择
##### (a)、Kafka
Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生**大量数据**的互联网服务的数据收集业务。**大型公司**建议可以选用,如果有**日志采集**功能,肯定是首选 kafka 了。
##### (b)、RocketMQ
天生为**金融互联网**领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。
##### (c)、RabbitMQ
结合 erlang 语言本身的并发优势,性能好**时效性微秒级**,**社区活跃度也比较高**,管理界面用起来十分方便,如果你的**数据量没有那么大**,中小型公司优先选择功能比较完备的 RabbitMQ。
## 2、RabbitMQ
### 1)、概念
RabbitMQ是一个消息中间件,接收并转发消息。
### 2)、四大核心概念
- **生产者**
产生数据并发送消息的程序
- **交换机**
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
- **队列**
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。
- **消费者**
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

### 3)、RabbitMQ核心部分
https://www.rabbitmq.com/getstarted.html

### 4)、名词介绍
- **Broker:表示消息队列服务器实体**
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。
- **Message :消息**
- **Publisher :消息的生产者,也是一个向交换器发布消息的客户端应用程序**
- **Exchange :交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。**
Exchange有4种类型:**direct(默认),fanout, topic, 和headers**,不同类型的Exchange转发消息的策略有所区别
- **Queue 消息队列,用来保存消息直到发送给消费者。**
它是消息的容器,也是消息的终点。一个消息 可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- **Binding 绑定,用于消息队列和交换器之间的关联。**
一个绑定就是基于路由键将交换器和消息队列连 接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。
- **Connection 网络连接,比如一个TCP连接。**
- **Channel 信道,多路复用连接中的一条独立的双向数据流通道。**
信道是建立在真实的TCP连接内的虚 拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这 些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所 以引入了信道的概念,以复用一条 TCP 连接。
- **Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。**
- **Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。**
虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有 自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定, RabbitMQ 默认的 vhost 是 / 。
### 5)、安装(windows)
#### (1)、安装Erlang
##### (a)、下载
官方下载地址:https://www.erlang.org/downloads
但是注意,需要根据rabbitmq的版本来,所以我这里下载的是`23.3.4.10`;
下载地址如下:(如果下载太慢,可以使用镜像网站进行下载,如:https://ghproxy.com/)
https://github.com/erlang/otp/releases/download/OTP-23.3.4.10/otp_win64_23.3.4.10.exe

##### (b)、点击安装
这里是.exe文件,一直点下一步即可
##### (c)、配置环境变量
配置`ERLANG_HOME`

配置Path

#### (2)、安装RabbitMQ
##### (a)、下载
进入开始页面,点击下载(由于页面加载较慢,需要等待一会,才能看见下面的页面)

选择压缩包下载

##### (b)、解压缩安装包到指定位置

##### (c)、配置环境变量
**新增RABBITMQ_SERVER变量**

**修改PATH变量**

##### (d)、启动管理插件
注意:启动插件时,rabbitmq服务必须是关闭的
cmd使用`rabbitmq-plugins.bat enable rabbitmq_management` 命令启动管理插件

##### (e)、启动服务
cmd输入`rabbitmq-service.bat install`启动服务
**注意**:这里需要使用管理员权限使用cmd,否则可能出现如下异常

安装成功

启动服务(方式一)

启动服务(方式二)
使用管理员权限启动命令行,使用`rabbitmq-service start`启动服务

##### (f)、查看状态
cmd输入`rabbitmqctl status`命令查看状态
##### (g)、访问页面
地址:http://localhost:15672/,账号密码:guest/guest


### 6、安装(Centos7)
#### (1)、安装Erlang
##### (a)、下载
官方下载地址:https://www.erlang.org/downloads
但是注意,需要根据rabbitmq的版本来,所以我这里下载的是`23.3.4.10`;

##### (b)、安装
解压,并进入目录
```
tar -zxvf otp_src_23.3.4.10.tar.gz
cd otp_src_23.3.4.10
```
安装依赖
```
# 安装依赖
yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget socat gcc-c++ kernel-devel m4 mesa* freeglut*
```
configure到指定目录
```
./configure --prefix=/opt/erlang/erlang_23
```
可能会出现如下错误

```
# 更新epel第三方软件库
yum install -y epel-release
# 再次执行,就可以安装了
yum install –y wxWidgets-devel
yum install wxBase #for /usr/bin/wx-config-3.0
cd /usr/bin
# 创建软连接
ln -s wx-config-3.0 wx-config
```
编译
```
make && make install
```
##### (c)、配置环境变量
配置`ERLANG_HOME、Path
更新配置
```
source /etc/profile
```


#### (2)、安装RabbitMQ
##### (a)、下载
https://www.rabbitmq.com/docs/download
进入开始页面,点击下载(由于页面加载较慢,需要等待一会,才能看见下面的页面),选择压缩包下载
如果需要下载老版本可以去github上找,https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.9.12

##### (b)、解压缩安装包到指定位置
```
tar -xvf rabbitmq-server-generic-unix-3.9.12.tar.xz
```
##### (c)、配置环境变量
**新增RABBITMQ_SERVER变量**
**修改PATH变量**

##### (d)、启动管理插件
注意:启动插件时,rabbitmq服务必须是关闭的
`rabbitmq-plugins enable rabbitmq_management ` 命令启动管理插件

##### (e)、启动服务
rabbitmq-server start
```
# 启动MQ
rabbitmq-server start
# 要后台启动可以使用
rabbitmq-server -detached
# 关闭MQ
rabbitmqctl stop
# 查看状态
rabbitmqctl status
rabbitmqctl list_users // 列出当前系统的用户
rabbitmqctl list_user_permissions [用户名] // 查看用户权限
rabbitmqctl delete_user [用户名] // 删除用户
rabbitmqctl change_password [用户名] [密码] // 修改用户密码
rabbitmq-plugins enable rabbitmq_management // 开启web远程管理界面
rabbitmq-plugins disable rabbitmq_management // 关闭web远程管理界面
systemctl start rabbitmq-server // 启动rabbitmq-server服务
systemctl stop rabbitmq-server // 停止rabbitmq-server服务
systemctl restart rabbitmq-server // 重启rabbitmq-server服务
systemctl reload rabbitmq-server // 重载rabbitmq-server服务 (推荐使用)
systemctl status rabbitmq-server // 查看rabbitmq-server状态
systemctl enable rabbitmq-server // 设置为开机启动rabbitmq-server服务
```

##### (f)、新增用户
执行下面的命令创建一个用户
rabbitmqctl add_user 用户名 密码
执行下面的命令设置用户为超级管理员。
rabbitmqctl set_user_tags 用户名 administrator
设置 admin 用户的权限,指定允许访问的vhost以及write/read
rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"
成功登录

# 二、核心
## 1、Hello world
官网地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
在本教程的这一部分中,我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印 出来的消费者。我们将介绍 Java API 中的一些细节。
在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代 表使用者保留的消息缓冲区
![(images/python-one.png) -> [|||] -> (C)](https://www.rabbitmq.com/img/tutorials/python-one.png)
### 1)、依赖
```xml
com.rabbitmq
amqp-client
5.9.0
commons-io
commons-io
2.8.0
```
### 2)、生产者
1. 创建连接
- 创建连接工厂
- 配置连接信息
- 创建连接
2. 创建队列
- 获得信道
- 创建队列
3. 发送消息
- 消息文本
- 发送消息
4. 关闭连接
示例:[生产者示例代码](01-hello/src/main/java/com/liyu/rabbit/Producer.java)
### 3)、消费者
1. 创建连接
- 创建连接工厂
- 配置连接信息
- 创建连接
2. 监听消息队列
- 创建信道
- 消费消息
示例:[消费者示例代码](01-hello/src/main/java/com/liyu/rabbit/Consumer.java)
注意:消费者不需要关闭连接
### 4)、效果
生产者生产消息

消费者消费消息

## 2、Work Queues
工作队列(又名:任务队列)是为了避免立即执行一项资源密集型任务,而不得不等待它完成。相反,我们将任务安排在以后完成。我们把任务封装成一个消息发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当运行许多工作线程时,任务将在它们之间共享。
### 1)、循环调度
在这个案例中我们会启动两个工作线程(消费者),一个消息发送线程(生产者),我们来看看他们两个工作线程 是如何工作的。
#### (1)、工厂类
在1中,我们发现创建信道(Channel)的这部分代码在生产者和消费者中是重复的,因此可以创建一个工厂类来创建信道。
示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/factory/RabbitFactory.java)
#### (2)、消费者
在消费者中,我们利用线程池,创建两个消费者线程,去消费消息
示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/roll/Consumer.java)
#### (3)、生产者
在生产者中,我们通过控制台,重复输入消息
示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/Producer.java)
#### (4)、效果
我们先再生产者这边发送几条消息,再去消费者查看消费情况,发现消费线程是轮询进行消费的


### 2)、消息确认
#### (1)、概念
消费者接收并处理完一条消息需要一定的时间,在这个过程中,如果消费者还没处理完,那么就会发生消息丢失。
为了保证消息在消费过程中不丢失,rabbitmq引入了消息应答机制:消费者在接收到消息并处理完消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。
#### (2)、自动应答
消息发送后即被认为已经传输成功,这种模式吞吐量较高,但是容易发生消息丢失,也有可能导致消息者消费过载,接收到太多来不及处理的消息,最终导致消息积压,内存耗尽。
#### (3)、消息应答的方法
`Channel.basicAck`:肯定确定
`Channel.basicNack`:否定确认
`Channel.basicReject`: 否定确认,和`Channel.basicNack`的区别:缺少`multiple`批量应答参数
#### (4)、修改消费者
- 修改消费`Channel.basicConsume`方法的`autoAck`参数未`false`
- 使用`basicNack`方法进行否认确认或`basicAck`肯定确认应答
示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/roll/ManualConsumer.java)
#### (5)、效果
消费失败的消息会重新进入队列,由其他队列消费

#### (6)、注意
不要忘记应答
思考:如果在生产者发消息时,MQ挂了,这个时候生产者不知道,怎么办?具体请看发布订阅模式
### 3)、消息持久化
#### (1)、修改生产者
- `channel.queueDeclare`的`durable`参数修改为`true`,使队列持久化
- `channel.basicPublish`的props参数使用`MessageProperties.PERSISTENT_TEXT_PLAIN`,表示持久化消息
示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/Producer.java)
注意:这种错误表示,要先删除原有的队列

出现D,表示是持久化的

#### (2)、效果
生产者生产几条消息,我们重启rabbit服务,看看消息还在不在

查看时,消息仍在

### 4)、公平调度
#### (1)、概念
假如有两个消费者,其中一个消费者处理很慢,一个处理很快,这个时候,我们采用循环调度的方式,这是**公平调度**,但是我们希望处理更快的消费者能处理更多的消息,这就是**不公平调度**

#### (2)、处理
`Channel.basicQos`方法可以设置消费者同时最多可以接收处理多少条消息,如果是1,那么在它确认应答之前,不向它发送新消息,而把它分配给其他可以处理的消费者
#### (3)、示例
示例:[示例代码](02-work-queues/src/main/java/com/liyu/rabbit/roll/FairConsumer.java)
创建两个消费者,一个处理消息要阻塞5秒,另一个直接处理,发送5条消息,查看效果
理想效果:阻塞的消费者只处理一条消息,其他消息都被另一个消费者处理

#### (4)、思考
假设现在有两个消费者A、B,他们处理速度差不多,这个时候我们给A的qos一个设置成1,B设成2;这种情况下,如果同时出现大量请求,会有什么效果。
B处理的请求大概是A的2倍。
## 3、发布订阅
向多个消费者传递一条消息,这种模式叫发布订阅。生产者发送一条消息时,mq向多个消费者(不一定是全部)进行广播。
### 1)、交换机
#### (1)、简介
回顾之前的内容,我们可以发现
- **生产者**——发送消息
- **消息队列**——存储消息
- **消费者**——接收消息
在这里,引入一个新的概念——**交换机**,接收生产者的消息并分发给消息队列
#### (2)、交换机的类型
- **`direct`**——直连交换机
根据消息携带的路由键(routing key)将消息投递给对应队列的。
- **`topic`**——主题交换机
队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列。
- **`headers`** ——头交换机
头交换机使用多个消息头属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
- **`fanout`**——扇型交换机
将消息路由给绑定到它身上的所有队列。
除此之外还有两种特殊的交换机
- **`default`**——默认交换机
实际上是一个由RabbitMQ预先声明好的名字为空字符串的直连交换机
- **`Dead Letter `**——死信交换机
当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人工干预。
### 2)、扇出(fanout)简单实例
将消息路由给绑定到它身上的所有队列。
#### (1)、生产者
示例:[示例代码](03-publish-subscribe/src/main/java/com/liyu/rabbit/Producer.java)
- 初始参数
```java
// 队列名称
public static final String QUEUE_NAME = "hello";
public static final String QUEUE_NAME2 = "hello2";
// 交换机名称
public static final String EXCHANGE_NAME = "ex_hello";
```
- 获得通道
- 声明并创建队列
```java
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueDeclare(QUEUE_NAME2,true, false, false, null);
```
- 设置交换机名称和类型
```java
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
```
- 绑定消息队列
```java
// 绑定交接机-路由-消息队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "rk_" +QUEUE_NAME);
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "rk_" +QUEUE_NAME2);
```
- 发布消息
```java
// 发送消息 这里要设置交换机名称,但是不设置路由
// MessageProperties.PERSISTENT_TEXT_PLAIN:表示持久化
channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
```
#### (2)、效果
使用生产者发送一条消息

**理想效果**:hello和hello2俩个消息队列都有该消息
**实际效果**:hello和hello2俩个消息队列都有该消息,实际如下
hello队列:

hello2队列

### 3)、发布确认
使用消息代理(如 RabbitMQ)的系统按照定义是分布式的。由于发送的协议方法(消息)不能保证到达对等方或被对等方成功处理,因此发布者和消费者都需要一种用于传递和处理确认的机制。
https://www.rabbitmq.com/confirms.html
##### 消费者交付确认
当 RabbitMQ 向消费者传递消息时,它需要知道何时认为消息发送成功。在[2.2.2【消息确认】](#2)、消息确认)中已经接触过;
##### 传递标签
在我们继续讨论其他主题之前,重要的是解释如何识别交付(并且确认表明它们各自的交付)。当注册消费者(订阅)时,RabbitMQ 将使用`basic.deliver` 方法传递(推送)消息。该方法带有一个传递标签`delivery tag`,它唯一地标识一个通道上的传递。因此,交付标签的范围是每个渠道。
交付标签是单调增长的正整数,并由客户端库呈现。确认交付的客户端库方法将交付标签作为参数
##### 生产者发布确认
https://www.rabbitmq.com/confirms.html#publisher-confirms
生产者发送消息到消息队列的过程中,可能因为网络,或其他原因,导致消息持久化失败,但是这个时候生产者并不知道,所以发生了消息丢失。
保证消息不丢失的唯一方法是使用事务——使通道具有事务性,然后为每条消息或一组消息发布、提交。
#### (1)、单个发布确认
示例:[示例代码](03-publish-subscribe/src/main/java/com/liyu/rabbit/PublisherConfirm.java)中的`singleConfirm`方法
步骤如下:
- 创建信道
- 开启发布确认
```java
channel.confirmSelect();
```
- 创建队列
- 发送单条消息
- 获得等待确认结果
```java
boolean b = channel.waitForConfirms();
```
#### (2)、批量发布确认
示例:[示例代码](03-publish-subscribe/src/main/java/com/liyu/rabbit/PublisherConfirm.java)中的`batchConfirm`方法
步骤如下:
- 创建信道
- 开启发布确认
- 创建队列
- 发送多条消息
- 获得等待确认结果
#### (3)、异步发布确认消息
示例:[示例代码](03-publish-subscribe/src/main/java/com/liyu/rabbit/PublisherConfirm.java)中的`asyncConfirm`方法
步骤如下:
- 创建信道
- 开启发布确认
- 创建队列
- 添加发布确认监听器
```java
ConfirmCallback ackCallback = (deliveryTag, multiple) ->{
// System.out.println("消息发送失败:" +deliveryTag);
};
ConfirmCallback nackCallback = (deliveryTag, multiple) ->{
System.out.println("消息发送失败:" +deliveryTag);
};
channel.addConfirmListener( ackCallback, nackCallback);
```
- 发送消息
#### (4)、三种方式发送1000条消息的效率比较

## 4、路由
在发布订阅模式中,我们绑定队列和交换机时,设置了一个routingKey参数,它用来表示交换机和队列的关系。

### 1)、Direct直连交换机
交换机将消息分发给完全匹配的队列。
示例:[示例代码](04-routing-key/src/main/java/com/liyu/rabbit/producer/DirectProducer.java)
##### 步骤:
- 创建信道
- 开启发布确认
- 添加发布确认监听
- 声明直连交换机
- 声明队列
- 建立交换机和队列路由关系
- 根据路由发送消息
##### 效果:


### 2)、Topic主题交换机
#### 简介:
主题交换机的路由必须由单词列表和"."号组成,如“hello.world”,这些单词可以是任何字符,但是一般使用和消息相关的一些特性。主题的交换机和直接交换机类似,都是使用路由匹配发送到指定队列。但是主题有两个特殊情况
- `*`(星号)可以只替换一个单词。
- `#` (hash) 可以代替零个或多个单词。
如:在以下示例中,

| 路由key | 匹配路由 | 匹配队列 |
| --------------- | -------------------------- | -------- |
| a.orange.b | \*.orange.\* | Q1 |
| a.orange | \ | 无 |
| orange.b | \ | 无 |
| a.b.rabbit | \*.\*.rabbit | Q2 |
| a.b.rabbit.c | \ | 无 |
| a.orange.rabbit | \*.orange.\*、\*.\*.rabbit | Q1、Q2 |
| lazy | lazy.\# | Q2 |
| lazy.a | lazy.\# | Q2 |
| lazy.a.b.c | lazy.\# | Q2 |
| lazy.orange.a | lazy.\#、\*.orange.\* | Q1、Q2 |
| a.lazy.b | \ | 无 |
| a.lazy.rabbit | \*.\*.rabbit | Q2 |
#### (1)、示例
##### 生产者:
- 创建连接、创建信道
- 开启发布确认、添加发布确实监听
- 声明交换机、声明队列
- 绑定交换机和队列
- 发送消息
示例:[示例代码](04-routing-key/src/main/java/com/liyu/rabbit/producer/TopicProducer.java)
##### 消费者:
- 创建连接、创建信道
- 实现消费消息代码
示例:[示例代码](04-routing-key/src/main/java/com/liyu/rabbit/consumer/Consumer.java)
#### (2)、效果


### 3)、死信队列
产生死信的条件,官方文档:https://www.rabbitmq.com/dlx.html
- 消费者进行否定确认,并将参数requeue设置为false;
- 消息超过设定的ttl有效时间
- 消息超过队列最大可用长度
#### (1)、生产者
- 创建信道、开启发布确认
- 声明普通交换机
- 绑定普通交换机和队列
- 创建消息序列化参数
expiration表示ttl有效时间
```java
BasicProperties basicProperties = new BasicProperties("text/plain",
null,
null,
2,
0, null, null,
// 到期时间:10s
// "10000",
null,
null, null, null, null,
null, null);
```
- 发送消息
```java
channel.basicPublish(DlxConsumer.NORMAL_HELLO_EXCHANGE,DlxConsumer.NORMAL_HELLO_QUEUE, basicProperties, msg.getBytes(StandardCharsets.UTF_8));
```
#### (2)、消费者
- 创建信道
- 声明普通队列
使用arguments绑定死信队列、死信路由、设置队列最大长度等
```java
/* start----------------------------------- 普通正常队列 ----------------------------- */
Map arguments = new HashMap<>();
// 死信交换机
arguments.put("x-dead-letter-exchange", DLX_HELLO_EXCHANGE);
// 死信路由key
arguments.put( "x-dead-letter-routing-key" , DLX_HELLO_QUEUE );
// 设置队列最大长度
arguments.put("x-max-length", 2);
// 过期时间:建议生产者设置
// arguments.put( "x-message-ttl" , 1000 );
channel.queueDeclare(NORMAL_HELLO_QUEUE, false, false, false, arguments);
channel.queueBind(NORMAL_HELLO_QUEUE, NORMAL_HELLO_EXCHANGE, NORMAL_HELLO_QUEUE);
```
- 普通消费者监听消费
注意:测试过期时间、最大可用长度时,我们不消费队列
```java
// 取消消费逻辑,保证消息存在于队列中不被消费
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("普通队列"+NORMAL_HELLO_QUEUE+"拒绝消费了一条消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
channel.basicNack(message.getEnvelope().getDeliveryTag(), false, false);
};
channel.basicConsume(NORMAL_HELLO_QUEUE, false, callback, consumerTag -> {});
```
#### (3)、死信队列和交换机
- 创建信道
- 声明死信队列
- 声明死信交换机
- 绑定死信队列和交换机
- 死信队列消费逻辑
```java
/* start--------------------------------- 死信队列 ---------------------------------- */
// 这里需要死信交换机
channel.exchangeDeclare(DLX_HELLO_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DLX_HELLO_QUEUE, false, false, false, null);
channel.queueBind(DlxConsumer.DLX_HELLO_QUEUE, DLX_HELLO_EXCHANGE, DLX_HELLO_QUEUE);
DeliverCallback dlxcallback = (consumerTag, message) -> {
System.out.println("死信队列"+ DlxConsumer.DLX_HELLO_QUEUE+"消费了一条被溢出的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(DlxConsumer.DLX_HELLO_QUEUE, true, dlxcallback, consumerTag -> {});
/* end--------------------------------- 死信队列 ---------------------------------- */
```
#### (4)、示例代码
生产者:[示例代码](04-routing-key/src/main/java/com/liyu/rabbit/consumer/DlxConsumer.java)
消费者(包括死信部分):[示例代码](04-routing-key/src/main/java/com/liyu/rabbit/producer/DlxProducer.java)
## 5、延迟队列
### 1)、利用死信队列+ttl(不推荐)
在2.4.3中, 我们知道,ttl消息过期可以触发死信队列,如果ttl的消息没有消费者,那么在到达过期时间后,会自动进入死信队列,这个可以实现延迟队列;
ttl不推荐的原因是,队列按照先进先出原则,只会判断第一条消息会不会过期,这就会导致后面的消息可能没及时过期进入死信队列
### 2)、插件实现
#### (1)、安装插件
rabbitmq插件基础知识:https://www.rabbitmq.com/plugins.html
rabbitmq社区插件中心地址:https://www.rabbitmq.com/community-plugins.html
我们在这里可以找到延迟插件`rabbitmq_delayed_message_exchange`

下载ez文件,并保存到plugins目录下

启用插件
```shell
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
```
然后重启服务,见到下面的类型则安装成功

#### (2)、示例
##### 生产者:[示例代码](05-delayed-message/src/main/java/com/liyu/rabbit/producer/Producer.java)

##### **消费者:**[示例代码](05-delayed-message/src/main/java/com/liyu/rabbit/consumer/Consumer.java)
#### (3)、效果
查看下面生产者和消费的时间,是否延迟了10秒


# 三、整合springboot
## 1、Hello world
### 1)、依赖
```xml
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-web
```
### 2)、yaml
```yaml
server:
port: 8000
spring:
application:
name: rabbit-boot
rabbitmq:
username: guest
password: guest
host: localhost
port: 5672
```
### 3)、组件
分成三步
1. 创建交换机
2. 创建队列
3. 绑定队列到交换机
[示例代码](06-springboot/src/main/java/com/liyu/config/DirectComment.java)
```java
public static final String DIRECT_EXCHANGE = "direct.ex.hello";
public static final String DIRECT_QUEUE = "direct.queue.hello";
public static final String DIRECT_RK = "direct.rk.hello";
/**
* 声明交换机
* @return 交换机
*/
@Bean("directExchange")
public Exchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
/**
* 声明队列
* @return 队列
*/
@Bean("directQueue")
public Queue directQueue() {
return new Queue(DIRECT_QUEUE);
}
/**
* 绑定队列和交换机
* @param directQueue 队列
* @param directExchange 交换机
* @return 绑定关系
*/
@Bean("directBinding")
public Binding directBinding(Queue directQueue, Exchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_RK).and(Collections.emptyMap());
}
```
### 4)、生产者-发送消息
我们写一个Controller,使用rabbitTemplate发送消息
[示例代码](06-springboot/src/main/java/com/liyu/controller/DirectController.java)
```java
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("{msg}")
public void send(@PathVariable("msg") String msg) {
rabbitTemplate.convertAndSend(DirectComment.DIRECT_EXCHANGE, DirectComment.DIRECT_RK, msg);
log.info("消息发送成功,交换机:{},路由:{},消息:{}", DirectComment.DIRECT_EXCHANGE, DirectComment.DIRECT_RK, msg);
}
```
### 5)、消费者-监听队列
[示例代码](06-springboot/src/main/java/com/liyu/listener/DirectConsumer.java)
```java
@RabbitListener(queues = DirectComment.DIRECT_QUEUE)
public void consume(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("队列{}消费了一条消息:{}", DirectComment.DIRECT_QUEUE, msg);
// 手动应答,注意默认是自动应答,所以这里不需要设置手动应答
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
```
### 6)、效果
通过url请求controller发送消息,查看日志

## 2、死信队列
### 1)、TTL、最大队列长度
[DlxComment.java(组件)](06-springboot/src/main/java/com/liyu/config/DlxComment.java)
[DlxController.java(生产者)](06-springboot/src/main/java/com/liyu/controller/DlxController.java)
[DlxConsumer.java(消费者)](06-springboot/src/main/java/com/liyu/listener/DlxConsumer.java)
#### (1)、普通队列组件
分成三步
1. 创建交换机
2. 创建队列
3. 绑定队列到交换机
- **普通交换机**
```java
@Bean("normalExchange")
public Exchange normalExchange() {
return new ExchangeBuilder(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT.getType()).build();
}
```
- **普通队列**
这里绑定死信交换机和死信队列;也可以在这里设置最大长度
```java
/**
* 普通队列
*/
@Bean("normalQueue")
public Queue normalQueue() {
// 注意:绑定死信队列和死信交换机
return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DLX_EXCHANGE).deadLetterRoutingKey(DLX_RK).build();
}
```
```java
/**
* 普通队列(最大队列长度)
*/
@Bean("normalMaxLengthQueue")
public Queue normalMaxLengthQueue() {
// 注意:绑定死信队列和死信交换机,并设置最大队列长度
return QueueBuilder.durable(NORMAL_MAXLENGTH_QUEUE).deadLetterExchange(DLX_EXCHANGE).deadLetterRoutingKey(DLX_RK).maxLength(5).build();
}
```
- **绑定普通队列到普通交换机**
```java
/**
* 绑定普通队列到普通交换机
*/
@Bean("normalBinding")
public Binding normalBinding(Exchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_RK).noargs();
}
```
```java
/**
* 绑定普通队列(最大队列长度)到普通交换机
* @param normalExchange
* @param normalMaxLengthQueue
* @return
*/
@Bean("normalMaxLengthBinding")
public Binding normalMaxLengthBinding(Exchange normalExchange, Queue normalMaxLengthQueue) {
return BindingBuilder.bind(normalMaxLengthQueue).to(normalExchange).with(NORMAL_MAXLENGTH_RK).noargs();
}
```
#### (2)、死信队列组件
和上面一样分三步
分成三步
1. 创建交换机
2. 创建队列
3. 绑定队列到交换机
```java
/**
* 死信交换机
*/
@Bean("dlxExchange")
public Exchange dlxExchange(){
return new ExchangeBuilder(DLX_EXCHANGE, BuiltinExchangeType.DIRECT.getType()).build();
}
/**
* 死信队列
*/
@Bean("dlxQueue")
public Queue dlxQueue() {
return QueueBuilder.durable(DLX_QUEUE).build();
}
/**
* 绑定死信队列到死信交换机
*/
@Bean("dlxBinding")
public Binding dlxBinding(Exchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_RK).noargs();
}
```
#### (3)、普通队列监听(消费)
这里要测试死信队列,如果普通队列消费了,就不是那么直观,所以这里不配置普通队列的消费者
#### (4)、死信队列监听(消费)
```java
@RabbitListener(queues = DlxComment.DLX_QUEUE)
public void dlxConsume(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("死信队列{}在{}消费了一条消息:{}", DlxComment.DLX_QUEUE, DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), msg);
}
```
#### (5)、Controller(生产)
ttl测试生产消息
```java
@GetMapping("{msg}")
public void send(@PathVariable("msg") String msg) {
rabbitTemplate.convertAndSend(DlxComment.NORMAL_EXCHANGE, DlxComment.NORMAL_RK, msg, message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置十秒过期时间
messageProperties.setExpiration("10000");
return message;
});
log.info("消息发送成功,交换机:{},路由:{},时间:{},消息:{}", DlxComment.NORMAL_EXCHANGE, DlxComment.NORMAL_RK, DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), msg);
}
```
超过最大队列长度的消费者
```
@GetMapping("maxlength/{msg}")
public void sendMaxLength(@PathVariable("msg") String msg) {
for (int i = 1; i <= 10; i ++ ) {
rabbitTemplate.convertAndSend(DlxComment.NORMAL_EXCHANGE, DlxComment.NORMAL_MAXLENGTH_RK, msg+i, message -> message);
log.info("消息发送成功,交换机:{},路由:{},时间:{},消息:{}", DlxComment.NORMAL_EXCHANGE, DlxComment.NORMAL_MAXLENGTH_RK, DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), msg + 1);
}
}
```
#### (6)、效果
ttl效果,可以看到,正好10秒后,死信队列消费到了消息


超过最大队列长度测试,我们发送了10条消息,正好有5条进入死信队列


### 2)、否定确定
在TTL的基础上,加上普通队列消费者
```java
/**
* 普通队列消费者
*/
@RabbitListener(queues = DlxComment.NORMAL_QUEUE)
public void normalConsume(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.info("普通队列{}在{}否定应答了一条消息:{}", DlxComment.NORMAL_QUEUE, DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), msg);
}
```
并配置开启手动应答

效果

## 3、延迟队列
#### (1)、组件
- 交换机
用`delayed()`方法设置这是一个延迟队列
```java
@Bean("delayedExchange")
public Exchange delayedExchange() {
return new ExchangeBuilder(DELAYED_EXCHANGE, BuiltinExchangeType.DIRECT.getType()).delayed().build();
}
```
- 消息队列
```java
@Bean("delayedQueue")
public Queue delayedQueue() {
return QueueBuilder.durable(DELAYED_QUEUE).build();
}
```
- 绑定关系
```java
public Binding delayedBinding(Queue delayedQueue, Exchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
```
#### (2)、生产者Controller
```java
@GetMapping("{msg}/{delay}")
public void send(@PathVariable("msg") String msg, @PathVariable(value = "delay", required = false) Integer delay) {
delay = ObjectUtils.isEmpty(delay) ? 10000 : delay;
Integer finalDelay = delay;
rabbitTemplate.convertAndSend(DelayedComment.DELAYED_EXCHANGE, DelayedComment.DELAYED_ROUTING_KEY, msg, message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setDelay(finalDelay);
return message;
});
log.info("消息发送成功,时间:{},延迟:{},消息:{},交换机:{},路由:{}",
DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"),
finalDelay,
msg,
DelayedComment.DELAYED_EXCHANGE,
DelayedComment.DELAYED_EXCHANGE
);
}
```
#### (3)、消费者监听器
```java
@RabbitListener(queues = DelayedComment.DELAYED_QUEUE)
public void consume(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("延迟队列{}于{}消费了一条消息:{}", message.getMessageProperties().getConsumerQueue(), DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), msg);
}
```
#### (4)、效果

## 4、发布确认
### 1)、交换机发布确认
#### (1)、配置开启发布确认
```yaml
server:
port: 8000
spring:
application:
name: rabbit-boot-publish
rabbitmq:
username: guest
password: guest
host: localhost
port: 5672
listener:
simple:
acknowledge-mode: manual
# 发布确认模式
# correlated:使用CorrelationData将确认与发送的消息关联起来。
# simple:在作用域内使用RabbitTemplate# waitforconfirmation()(或waitForConfirmsOrDie())。
# none:发布者确认被禁用(默认)。
publisher-confirm-type: correlated
```
#### (2)、发布消息,并添加发布确认
```java
@Resource
private RabbitTemplate rabbitTemplate;
// 添加监听
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback((@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause)->{
String id = ObjectUtils.isEmpty(correlationData) ? "" : correlationData.getId();
if (ack) {
log.info("消息发送成功,id={}",id);
} else {
log.warn("消息发送失败,id={},\n异常信息={}", id, cause);
}
});
}
@GetMapping("{id}/{msg}")
public void sendNoExchange(@PathVariable("id") String id, @PathVariable("msg") String msg) {
// 设置correlationData,发布确认时,我们通过这个匹配信息
CorrelationData correlationData = new CorrelationData();
correlationData.setId(id);
rabbitTemplate.convertAndSend("ex.no", "rk.no", msg, message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置十秒过期时间
messageProperties.setExpiration("10000");
return message;
}, correlationData);
}
```
#### (3)、效果
**交换机不存在时:**
发送请求:

效果:

**消息队列不存在时**


发现如果交换机存在,但是队列不存在,这时候提示发送成功,这个时候发生了消息丢失
### 2)、消息回退
在上一个部分中,交换机无法通过路由找到交换机时,消息丢失;我们可以设置消息回退防止消息丢失;
#### (1)、yaml配置
在上例的配置基础上添加`spring.rabbitmq.publisher-returns= true`

#### (2)、发送消息时,添加回退监听

#### (3)、效果

### 3)、备份交换机
概述:在前面,我们因为没有绑定队列或没有匹配的队列,导致消息发送失败,这个时候我们可以设置备份队列,通过备份交换机和队列重新发送消息。甚至可以添加异常消息监听。
#### (1)、yaml
同上
#### (2)、组件
```java
public static final String NORMAL_EXCHANGE = "normal.ex.hello";
public static final String HELLO_RK = "rk.hello";
public static final String ALTERNATE_EXCHANGE = "alternate.exhange.hello";
public static final String ALTERNATE_QUEUE = "alternate.queue.hello";
public static final String WARNING_RK = "#";
public static final String WARNING_QUEUE = "warning.queue.hello";
@Bean("normalExchange")
public Exchange normalExchange() {
return new ExchangeBuilder(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT.getType()).alternate(ALTERNATE_EXCHANGE).build();
}
@Bean("alternateExchange")
public Exchange alternateExchange() {
return new ExchangeBuilder(ALTERNATE_EXCHANGE, BuiltinExchangeType.TOPIC.getType()).build();
}
@Bean("alternateQueue")
public Queue alternateQueue(){
return QueueBuilder.durable(ALTERNATE_QUEUE).build();
}
@Bean("warningQueue")
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE).build();
}
@Bean("alternateBinding")
public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) {
return BindingBuilder.bind(alternateQueue).to(alternateExchange).with(HELLO_RK).noargs();
}
@Bean("warnBinding")
public Binding warnBinding(Exchange alternateExchange, Queue warningQueue) {
return BindingBuilder.bind(warningQueue).to(alternateExchange).with(WARNING_RK).noargs();
}
```
#### (3)、发送消息
```java
/**
* 备份交换机测试
*/
@GetMapping("alternate/{id}/{msg}")
public void alternate(@PathVariable("id") String id, @PathVariable("msg") String msg) {
// 设置correlationData,发布确认时,我们通过这个匹配信息
CorrelationData correlationData = new CorrelationData();
correlationData.setId(id);
rabbitTemplate.convertAndSend(AlternateExchangeCommon.NORMAL_EXCHANGE, AlternateExchangeCommon.HELLO_RK, msg, message -> message, correlationData);
}
```
#### (4)、消息监听
```java
@RabbitListener(queues = AlternateExchangeCommon.ALTERNATE_QUEUE)
public void alterQueueListener(Channel channel, Message message) throws IOException {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("备份队列接收到信息:" + msg);
}
@RabbitListener(queues = AlternateExchangeCommon.WARNING_QUEUE)
public void warnQueueListener(Channel channel, Message message) throws IOException {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("异常队列接收到信息:" + msg);
}
```
#### (5)、效果

### 4)、优先队列
- 声明优先队列
设置最大优先值`x-max-priority`,最大225,建议1-10
- 发送消息时设置优先值`x-priority`,值越大,优先级越高
#### (1)、组件
```java
public static final String PRIORITY_EXCHANGE = "priority.exchange.hello";
public static final String PRIORITY_QUEUE = "priority.queue.hello";
public static final String PRIORITY_ROUTING_KEY = "priority.rk.hello";
@Bean("priorityExchange")
public Exchange priorityExchange() {
return new ExchangeBuilder(PRIORITY_EXCHANGE, BuiltinExchangeType.DIRECT.getType()).build();
}
@Bean("priorityQueue")
public Queue priorityQueue() {
return QueueBuilder.durable(PRIORITY_QUEUE).maxPriority(10).build();
}
@Bean("priorityBinding")
public Binding priorityBinding(Exchange priorityExchange, Queue priorityQueue) {
return BindingBuilder.bind(priorityQueue).to(priorityExchange).with(PRIORITY_ROUTING_KEY).noargs();
}
```
#### (2)、发送消息
```java
/**
* 优先级
*/
@GetMapping("priority")
public void priority() {
sendPriority(1, "消息A");
sendPriority(2, "消息B");
sendPriority(3, "消息C");
sendPriority(4, "消息D");
sendPriority(5, "消息E");
}
private void sendPriority(int priority, String msg) {
rabbitTemplate.convertAndSend(PriorityExchangeCommon.PRIORITY_EXCHANGE, PriorityExchangeCommon.PRIORITY_ROUTING_KEY, msg, message -> {
MessageProperties properties = message.getMessageProperties();
properties.setPriority(priority);
return message;
});
log.info("成功发送消息{},优先级:{}", msg, priority);
}
```
#### (3)、消息监听
注意:等消息发送完成后,再添加消息监听,否则因为消息发送就被成功消费,是看不到效果的
```java
@RabbitListener(queues = PriorityExchangeCommon.PRIORITY_QUEUE)
public void priorityListener(Channel channel, Message message) throws IOException {
log.info("消费消息:{}", new String(message.getBody(), StandardCharsets.UTF_8));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
```
#### (4)、效果
根据设置的消息优先级,消费的消息顺序应该是EDCBA
