# rabbitMq
**Repository Path**: gjlearn/rabbit-mq
## Basic Information
- **Project Name**: rabbitMq
- **Description**: 尚硅谷rabbitMq学习笔记
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-09-22
- **Last Updated**: 2025-09-27
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# rabbitMq学习笔记
* 跟随最新的视频学习的
## 为什么需要消息队列


### 响应的速度
!(assets/image-20250922200958412.png)

### 对并发流量的控制


### 削峰限流的好处

### 符合开闭原则
* 对扩展打开, 对修改关闭

### 总结

## 什么是消息队列

### 消息队列实现的主流方式



## rabbitMq引入

### rabbitMq体系结构
#### Producer
* 消息的发送端, 消息的生产者
#### consumer
* 消息的接受端, 消息的消费者
#### Virtual Host
* 每一个Virtual Host就是一个虚拟分组,用户在自己的Virtual Host中使用RabbitMQ组件 在实际开发中,通过Virtual Host区分不同项目、不同功能
#### Exchange
* 交换机,只负责转发,不存储消息,是消息达到Broker的第一站。注意:Exchange(交换机)**只负责转发**消息,**不具备存储**消息的能力,因 此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那 么消息会**丢失**!
##### 交换机的类型
###### Fanout
* 广播,将消息发送给所有绑定到交换机的队列
###### Driect
* 定向,把消息交给符合指定routing key的队列,**Work Queues工作模式 用的是 direct 类型的交换机**
* 如果你不声明,RabbitMQ 会使用一个 **默认的 nameless direct exchange(空字符串 “”)**。
* 当你调用 channel.basicPublish("", "queue_name", ...) 时,实际上就是把消息通过默认的 direct 交换机路由到指定队列。
###### topic
* 通配符,把消息交给符合routing pattern(路由模式)的队列

* 举例



#### queue
* 队列,是消息的容器。消息放在这里等待被消费端取走
#### 工作图

### docker安装rabbitMq并测试
```bash
# 拉取镜像
docker pull rabbitmq:3.13-management
# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER= yourUsername \
-e RABBITMQ_DEFAULT_PASS= yourPassword \
rabbitmq:3.13-management
```
访问后台管理界面:http://ip:15672。 用自己设置的账号密码登录

## 入门案例
[HelloRabbitMq](HelloRabbitMq) 这个模块
### 工作方式 (重点)
• RabbitMQ官网,通过教程的形式,给我们列举了7种RabbitMQ用法
• 网址:https://www.rabbitmq.com/getstarted.html
#### Work Queues
* Work Queues 用的是 direct 类型的交换机

#### Publish/Subscribe
* Publish:发布,把消息发送到交换机上
* Subscribe:订阅,只要把队列和交换机绑定,事实上就形成了一种订阅关系
* 生产者不是把消息直接发送到队列,而是发送到交换机
* 交换机接收消息,而如何处理消息取决于交换机的类型
* 工作机制:消息发送到交换机上,就会以**广播**的形式发送给所有已绑定队列

#### Routing
* 路由模式取决于交换机的类型 , 见交换机的类型
### 工作模式小结
• 直接发送到队列:底层使用了默认交换机
• 经过交换机发送到队列
• Fanout:没有Routing key直接绑定队列
• Direct:通过Routing key绑定队列,消息发送到绑定的队列上
• 一个交换机绑定一个队列:定点发送
• 一个交换机绑定多个队列:广播发送
• Topic:针对Routing key使用通配符
## springBoot中使用rabbitMQ
### 1、导入依赖
```xml
org.springframework.boot
spring-boot-starter-parent
3.1.5
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
```
### 2、写一个启动类
```java
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
```
### 3、写一个消费者consumer
```java
@Component
public class MyMessageListener {
//交换机
public static final String EXCHANGE_DIRECT = "exchange01";
//路由键
public static final String ROUTING_KEY = "A";
//队列名称
public static final String QUEUE_NAME = "chen";
// 写法一可以创建路由队列和他们之间的绑定关系并监听
// @RabbitListener(
// bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),
//// exchange = @Exchange(value = EXCHANGE_DIRECT),
// exchange = @Exchange(EXCHANGE_DIRECT), //就一个value属性可以省略value=xx
// key = {ROUTING_KEY}
// )
// )
//写法二不需要创建, 只需要指定要监听的队列即可. 提起是已经都建立好
@RabbitListener(queues = {QUEUE_NAME})
public void proceedMessage(String data, Message message, Channel channel) {
System.out.println("消费端接收到了消息"+data);
}
}
```



* 设置的值在控制台的显示的地方



### 4、写一个生产者 producer , 发送消息
新建某块,导入依赖, 写测试类、
```xml
org.springframework.boot
spring-boot-starter-parent
3.1.5
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-test
```
applicaiton.yml
```yml
spring:
rabbitmq:
host: 172.20.10.10
port: 5672
username: guest
password: 123456
virtual-host: /
```
测试类
```java
@SpringBootTest
public class TestRabbit {
//交换机
public static final String EXCHANGE_DIRECT = "exchange01";
//路由键
public static final String ROUTING_KEY = "A";
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSend() {
// 根据 交换机和路由键就可以定位到 队列
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "hello rabbit, I am springBoot");
}
}
```
5、 运行测试类,看到消费者接收到消息

## rabbitMQ使用过程中可以遇到的问题
### 故障情况1: 消息没有发送到消息队列(producer端配置)
#### 解决思路A(**生产者端**进行**确认**)
* 在**生产者端**进行**确认**,具体操作中我们会分别针对**交换机**和**队列**来确认, 如果没有成功发送到消息队列服务器上,那就可以尝试重新发送
#### 1、 在配置文件中开启确认
```xml
spring:
rabbitmq:
host: 172.20.10.10
port: 5672
username: guest
password: 123456
virtual-host: /
# 必须加入下面两个配置,才能开启确认
publisher-confirm-type: CORRELATED # 交换机的确认 correlate
publisher-returns: true # 队列的确认
```
#### 2、 创建配置类
在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:
| 方法名 | 方法功能 | 所属接口 | 接口所属类 |
| ----------------- | ------------------------ | --------------- | -------------- |
| confirm() | 确认消息是否发送到交换机 | ConfirmCallback | RabbitTemplate |
| returnedMessage() | 确认消息是否发送到队列 | ReturnsCallback | RabbitTemplate |
然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。
原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。
而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:
| 设置组件调用的方法 | 所需对象类型 |
| -------------------- | ----------------------- |
| setConfirmCallback() | ConfirmCallback接口类型 |
| setReturnCallback() | ReturnCallback接口类型 |
* 配置类
```java
@Configuration
public class RabbitMQACJConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
// 第二部, 设置
@Autowired
private RabbitTemplate rabbitTemplate;
// 创建对象完成后,自动调用这个方法
@PostConstruct
public void init() {
// 把我们自定义的方法设置进去
rabbitTemplate.setConfirmCallback(this); // 路由器
rabbitTemplate.setReturnsCallback(this); //队列
}
// 第一步先重写两个回掉方法
/**
* 消息发送成功、失败发送到交换机时,调用的方法
correlationData – correlation data for the callback.
ack – true for ack, false for nack(no acknowledge)
cause – An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//soutp+enter 快速打印参数
System.out.println("correlationData = " + correlationData + ", ack = " + ack + ", cause = " + cause);
}
/**
* 发送到队列失败, 调用这个方法
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息主体: " + new String(returned.getMessage().getBody()));
System.out.println("应答码: " + returned.getReplyCode());
System.out.println("描述:" + returned.getReplyText());
System.out.println("消息使用的交换器 exchange : " + returned.getExchange());
System.out.println("消息使用的路由键 routing : " + returned.getRoutingKey());
}
}
```
配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中。
操作封装到了一个专门的void init()方法中。
为了保证这个void init()方法在应用启动时被调用,我们使用@PostConstruct注解来修饰这个方法。
关于@PostConstruct注解大家可以参照以下说明:
> @PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。当使用依赖注入(如Spring框架)或者其他方式创建对象时,@PostConstruct注解可以确保在对象完全初始化之后,执行相应的方法。
>
> 使用@PostConstruct注解的方法必须满足以下条件:
>
> 1. 方法不能有任何参数。
> 2. 方法必须是非静态的。
> 3. 方法不能返回任何值。
>
> 当容器实例化一个带有@PostConstruct注解的Bean时,它会在调用构造函数之后,并在依赖注入完成之前调用被@PostConstruct注解标记的方法。这样,我们可以在该方法中进行一些初始化操作,比如读取配置文件、建立数据库连接等。
#### 3 、测试
之前rabbitTemplate怎么用,现在还怎么用
##### 成功的情况
```java
@Test
void testConfirm() {
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "confirm message");
}
```


##### 路由不对的情况
```java
@Test
void testConfirm() {
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT+"chen", ROUTING_KEY, "confirm message");
}
```

##### 路由键没找到的情况
```java
@Test
void testConfirm() {
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY+"chen", "confirm message");
}
```

#### 解决思路B:备份交换机
我改了一下名称方便记忆
```java
//交换机
public static final String EXCHANGE_DIRECT = "exchange1";
//路由键
public static final String ROUTING_KEY = "routingKey1";
//队列名称
public static final String QUEUE_NAME = "queue1";
```
* 为目标交换机指定**备份交换机**,当目标交换机投递失败时,把消息投递至 备份交换机

* 用控制面板创建备份交换机器 **backupExchange01**






##### 测试
```java
@Test
void testConfirm() {
// 不能改交换机的名字, 因为备份交换机是和交换机绑定的
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY+"aa", "confirm message");
}
```


### 故障情况2:消息队列服务器宕机导致内存中消息丢失
* 解决思路:**消息持久化**到硬盘上,哪怕服务器重启也不会导致消息丢失
#### 测试 默认就是持久话的
1、重启rabbitMQ
```bash
docker restart 容器名
```



### 故障情况3: 消费端宕机或抛异常导致消息没有成功被消费

创建新模块, 只有配置文件不同
```yml
spring:
rabbitmq:
host: 172.20.10.10
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认
```
```java
@Component
public class ConfirmRabbitListener {
public static final String QUEUE_Name = "queue1";
@RabbitListener(queues = QUEUE_Name)
public void processMessage(String data, Message message, Channel channel) throws IOException {
//获取 deliveryTag 消息的唯一标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//核心业务
System.out.println(data);
//成功了返回AKC
//multiple: 是否批量处理数据
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//时候被重新投递过
Boolean redelivered = message.getMessageProperties().getRedelivered();
if (redelivered) {
//requeue true, broker会重新投放消息
// false broker会丢掉这个消息
//被重新投递过直接丢弃
channel.basicNack(deliveryTag, false, false);
} else {
//失败了返回NACK
// 没被重新投递过重新投递
channel.basicNack(deliveryTag, false, true);
}
}
}
}
```

##### deliveryTag


##### multiple

##### 控制台发送消息测试

##### 没有异常的情况

##### 有异常的情况


### 消费端限流
 

```java
public static final String QUEUE_Name = "queue1";
/**
* 测试precache
*/
@RabbitListener(queues = QUEUE_Name)
public void processMessageTestPreCatch(String data, Message message, Channel channel) throws IOException, InterruptedException {
//让处理的速度慢一点方便观察
TimeUnit.SECONDS.sleep(1);
// 还是采用手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
```

开启prefetch


### 消息超时
#### 设置队列全局超时的时间
* 给**消息**设定一个**过期时间**,超过这个时间没有被取走的消息就会被**删除**
* • **队列层面**:在队列层面设定**消息的过期时间**,并不是队列的过期时间。意思是这 个队列中的消息全部使用**同一个**过期时间。



```java
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testTimeOut() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("timeout.exchange","timeout.routingKey" ,"testTimeout");
}
}
```
把消费者停掉

#### MessagePostProcessor设置具体某个消息的过期时间
• **消息本身**:给具体的某个消息设定过期时间
```java
@Test
void testTimeOutPostProcesser() {
MessagePostProcessor messagePostProcessor = message -> {
message.getMessageProperties().setExpiration("6000"); //单位也是毫秒
return message;
};
rabbitTemplate.convertAndSend("timeout.exchange","timeout.routingKey" ,"testTimeout",messagePostProcessor);
}
```

### 死信和死信队列
* 概念:当一个消息无法被消费,它就变成了死信。
#### 死信产生的的原因

#### 测试. 消息由正常队列进入死信队列
* 先创建死信和信息队列 和对应的路由 (被动接受消息)



* 创建正常的交换机和队列 (遇到问题转入到死信队列)





### 消息数量超过队列容纳极限
* 发送消息前, 先把消费端停掉
```
/**
* 测试超时,进入死信队列
*/
@Test
public void testSendMultiMessage() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(
"normal.exchange",
"normal.routingkey",
"测试死信情况2:消息数量超过队列的最大容量" + i);
}
}
```

官网地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
确定卷映射目录
```bash
docker inspect 容器名
```

## 延迟队列
* 应用场景
* 
### 1、 基于死信的延迟队列

### 2、延迟插件
#### 下载和安装
官方文档说明页地址:https://www.rabbitmq.com/community-plugins.html

下载插件安装文件:
```shell
# cd 进入到宿主机对应插件的目录
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
```
```bash
# 登录进入容器内部
docker exec -it rabbitmq /bin/bash
# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 退出Docker容器
exit
# 重启Docker容器
docker restart rabbitmq
# 看看是否启用
rabbitmq-plugins list
```




#### 创建基于插件的队列和使用

关于x-delayed-type参数的理解:
> 原本指定交换机类型的地方使用了x-delayed-message这个值,那么这个交换机除了支持延迟消息之外,到底是direct、fanout、topic这些类型中的哪一个呢?
>
> 这里就额外使用x-delayed-type来指定交换机本身的类型

创建生产端
```java
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 测试延迟插件
*/
@Test
void testDelayPlugin() {
// 创建消息后置处理器
MessagePostProcessor messagePostProcessor=message -> {
//安装插件后 x-delayed-message-exchange 才能识别x-delay 参数
message.getMessageProperties().setHeader("x-delay",5000); //过期时间 单位毫秒
return message;
};
//发送消息
rabbitTemplate.convertAndSend("exchange.delay",
"delay.routingkey",
"delayPlugin message"+ new SimpleDateFormat("HH:mm:ss").format(new Date()),
messagePostProcessor
);
}
```
创建消费者
```java
@RabbitListener(queues = {"queue.delay"})
public void processDelayPluginMessage(String data,Message message, Channel channel) throws IOException {
System.out.println("消息内容是:"+data);
System.out.println("当前时间是:"+new SimpleDateFormat("HH:mm:ss").format(new Date()));
//确认消
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
```

#### 注意点
