# rabbitMq **Repository Path**: gjlearn/rabbit-mq ## Basic Information - **Project Name**: rabbitMq - **Description**: 尚硅谷rabbitMq学习笔记 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-09-22 - **Last Updated**: 2025-09-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # rabbitMq学习笔记 * 跟随最新的视频学习的 ## 为什么需要消息队列 ![image-20250922200419813](assets/image-20250922200419813.png) ![image-20250922200853895](assets/image-20250922200853895.png) ### 响应的速度 !(assets/image-20250922200958412.png) ![image-20250922201043170](assets/image-20250922201043170.png) ### 对并发流量的控制 ![image-20250922201146133](assets/image-20250922201146133.png) ![image-20250922201325131](assets/image-20250922201325131.png) ### 削峰限流的好处 ![image-20250922201605895](assets/image-20250922201605895.png) ### 符合开闭原则 * 对扩展打开, 对修改关闭 ![image-20250922201758469](assets/image-20250922201758469.png) ### 总结 ![image-20250922201844137](assets/image-20250922201844137.png) ## 什么是消息队列 ![image-20250922201912974](assets/image-20250922201912974.png) ### 消息队列实现的主流方式 ![image-20250922202005367](assets/image-20250922202005367.png) ![image-20250922202014273](assets/image-20250922202014273.png) ![image-20250922202019596](assets/image-20250922202019596.png) ## rabbitMq引入 ![image-20250922202051080](assets/image-20250922202051080.png) ### rabbitMq体系结构 #### Producer * 消息的发送端, 消息的生产者 #### consumer * 消息的接受端, 消息的消费者 #### Virtual Host * 每一个Virtual Host就是一个虚拟分组,用户在自己的Virtual Host中使用RabbitMQ组件 在实际开发中,通过Virtual Host区分不同项目、不同功能 #### Exchange * 交换机,只负责转发,不存储消息,是消息达到Broker的第一站。注意:Exchange(交换机)**只负责转发**消息,**不具备存储**消息的能力,因 此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那 么消息会**丢失**! ##### 交换机的类型 ###### Fanout * 广播,将消息发送给所有绑定到交换机的队列 ###### Driect * 定向,把消息交给符合指定routing key的队列,**Work Queues工作模式 用的是 direct 类型的交换机** * 如果你不声明,RabbitMQ 会使用一个 **默认的 nameless direct exchange(空字符串 “”)**。 * 当你调用 channel.basicPublish("", "queue_name", ...) 时,实际上就是把消息通过默认的 direct 交换机路由到指定队列。 ###### topic * 通配符,把消息交给符合routing pattern(路由模式)的队列 ![image-20250923102531327](assets/image-20250923102531327.png) * 举例 ![image-20250923102731331](assets/image-20250923102731331.png) ![image-20250923102745377](assets/image-20250923102745377.png) ![image-20250923103241725](assets/image-20250923103241725.png) #### queue * 队列,是消息的容器。消息放在这里等待被消费端取走 #### 工作图 ![image-20250922202923352](assets/image-20250922202923352.png) ### docker安装rabbitMq并测试 ```bash # 拉取镜像 docker pull rabbitmq:3.13-management # -d 参数:后台运行 Docker 容器 # --name 参数:设置容器名称 # -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问 # -v 参数:卷映射目录 # -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码 docker run -d \ --name rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ -v rabbitmq-plugin:/plugins \ -e RABBITMQ_DEFAULT_USER= yourUsername \ -e RABBITMQ_DEFAULT_PASS= yourPassword \ rabbitmq:3.13-management ``` 访问后台管理界面:http://ip:15672。 用自己设置的账号密码登录 ![image-20250922203858844](assets/image-20250922203858844.png) ## 入门案例 [HelloRabbitMq](HelloRabbitMq) 这个模块 ### 工作方式 (重点) • RabbitMQ官网,通过教程的形式,给我们列举了7种RabbitMQ用法 • 网址:https://www.rabbitmq.com/getstarted.html #### Work Queues * Work Queues 用的是 direct 类型的交换机 ![image-20250923100937798](assets/image-20250923100937798.png) #### Publish/Subscribe * Publish:发布,把消息发送到交换机上 * Subscribe:订阅,只要把队列和交换机绑定,事实上就形成了一种订阅关系 * 生产者不是把消息直接发送到队列,而是发送到交换机 * 交换机接收消息,而如何处理消息取决于交换机的类型 * 工作机制:消息发送到交换机上,就会以**广播**的形式发送给所有已绑定队列 ![image-20250923102003796](assets/image-20250923102003796.png) #### Routing * 路由模式取决于交换机的类型 , 见交换机的类型 ### 工作模式小结 • 直接发送到队列:底层使用了默认交换机 • 经过交换机发送到队列 • Fanout:没有Routing key直接绑定队列 • Direct:通过Routing key绑定队列,消息发送到绑定的队列上 ​ • 一个交换机绑定一个队列:定点发送 ​ • 一个交换机绑定多个队列:广播发送 • Topic:针对Routing key使用通配符 ## springBoot中使用rabbitMQ ### 1、导入依赖 ```xml org.springframework.boot spring-boot-starter-parent 3.1.5 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web org.projectlombok lombok ``` ### 2、写一个启动类 ```java @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class,args); } } ``` ### 3、写一个消费者consumer ```java @Component public class MyMessageListener { //交换机 public static final String EXCHANGE_DIRECT = "exchange01"; //路由键 public static final String ROUTING_KEY = "A"; //队列名称 public static final String QUEUE_NAME = "chen"; // 写法一可以创建路由队列和他们之间的绑定关系并监听 // @RabbitListener( // bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"), //// exchange = @Exchange(value = EXCHANGE_DIRECT), // exchange = @Exchange(EXCHANGE_DIRECT), //就一个value属性可以省略value=xx // key = {ROUTING_KEY} // ) // ) //写法二不需要创建, 只需要指定要监听的队列即可. 提起是已经都建立好 @RabbitListener(queues = {QUEUE_NAME}) public void proceedMessage(String data, Message message, Channel channel) { System.out.println("消费端接收到了消息"+data); } } ``` ![image-20250923144122789](assets/image-20250923144122789.png) ![image-20250923144153376](assets/image-20250923144153376.png) ![image-20250923144427207](assets/image-20250923144427207.png) * 设置的值在控制台的显示的地方 ![image-20250923144012360](assets/image-20250923144012360.png) ![image-20250923144037879](assets/image-20250923144037879.png) ![image-20250923144910957](assets/image-20250923144910957.png) ### 4、写一个生产者 producer , 发送消息 新建某块,导入依赖, 写测试类、 ```xml org.springframework.boot spring-boot-starter-parent 3.1.5 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test ``` applicaiton.yml ```yml spring: rabbitmq: host: 172.20.10.10 port: 5672 username: guest password: 123456 virtual-host: / ``` 测试类 ```java @SpringBootTest public class TestRabbit { //交换机 public static final String EXCHANGE_DIRECT = "exchange01"; //路由键 public static final String ROUTING_KEY = "A"; @Autowired private RabbitTemplate rabbitTemplate; @Test void testSend() { // 根据 交换机和路由键就可以定位到 队列 rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "hello rabbit, I am springBoot"); } } ``` 5、 运行测试类,看到消费者接收到消息 ​ ![image-20250923151628152](assets/image-20250923151628152.png) ## rabbitMQ使用过程中可以遇到的问题 ### 故障情况1: 消息没有发送到消息队列(producer端配置) #### 解决思路A(**生产者端**进行**确认**) * 在**生产者端**进行**确认**,具体操作中我们会分别针对**交换机**和**队列**来确认, 如果没有成功发送到消息队列服务器上,那就可以尝试重新发送 #### 1、 在配置文件中开启确认 ```xml spring: rabbitmq: host: 172.20.10.10 port: 5672 username: guest password: 123456 virtual-host: / # 必须加入下面两个配置,才能开启确认 publisher-confirm-type: CORRELATED # 交换机的确认 correlate publisher-returns: true # 队列的确认 ``` #### 2、 创建配置类 在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息: | 方法名 | 方法功能 | 所属接口 | 接口所属类 | | ----------------- | ------------------------ | --------------- | -------------- | | confirm() | 确认消息是否发送到交换机 | ConfirmCallback | RabbitTemplate | | returnedMessage() | 确认消息是否发送到队列 | ReturnsCallback | RabbitTemplate | 然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。 原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。 而设置对应的组件,需要调用RabbitTemplate对象下面两个方法: | 设置组件调用的方法 | 所需对象类型 | | -------------------- | ----------------------- | | setConfirmCallback() | ConfirmCallback接口类型 | | setReturnCallback() | ReturnCallback接口类型 | * 配置类 ```java @Configuration public class RabbitMQACJConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { // 第二部, 设置 @Autowired private RabbitTemplate rabbitTemplate; // 创建对象完成后,自动调用这个方法 @PostConstruct public void init() { // 把我们自定义的方法设置进去 rabbitTemplate.setConfirmCallback(this); // 路由器 rabbitTemplate.setReturnsCallback(this); //队列 } // 第一步先重写两个回掉方法 /** * 消息发送成功、失败发送到交换机时,调用的方法 correlationData – correlation data for the callback. ack – true for ack, false for nack(no acknowledge) cause – An optional cause, for nack, when available, otherwise null. */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //soutp+enter 快速打印参数 System.out.println("correlationData = " + correlationData + ", ack = " + ack + ", cause = " + cause); } /** * 发送到队列失败, 调用这个方法 * @param returned the returned message and metadata. */ @Override public void returnedMessage(ReturnedMessage returned) { System.out.println("消息主体: " + new String(returned.getMessage().getBody())); System.out.println("应答码: " + returned.getReplyCode()); System.out.println("描述:" + returned.getReplyText()); System.out.println("消息使用的交换器 exchange : " + returned.getExchange()); System.out.println("消息使用的路由键 routing : " + returned.getRoutingKey()); } } ``` 配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中。 操作封装到了一个专门的void init()方法中。 为了保证这个void init()方法在应用启动时被调用,我们使用@PostConstruct注解来修饰这个方法。 关于@PostConstruct注解大家可以参照以下说明: > @PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。当使用依赖注入(如Spring框架)或者其他方式创建对象时,@PostConstruct注解可以确保在对象完全初始化之后,执行相应的方法。 > > 使用@PostConstruct注解的方法必须满足以下条件: > > 1. 方法不能有任何参数。 > 2. 方法必须是非静态的。 > 3. 方法不能返回任何值。 > > 当容器实例化一个带有@PostConstruct注解的Bean时,它会在调用构造函数之后,并在依赖注入完成之前调用被@PostConstruct注解标记的方法。这样,我们可以在该方法中进行一些初始化操作,比如读取配置文件、建立数据库连接等。 #### 3 、测试 之前rabbitTemplate怎么用,现在还怎么用 ##### 成功的情况 ```java @Test void testConfirm() { rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "confirm message"); } ``` ![image-20250923161049577](assets/image-20250923161049577.png) ![image-20250923161600125](assets/image-20250923161600125.png) ##### 路由不对的情况 ```java @Test void testConfirm() { rabbitTemplate.convertAndSend(EXCHANGE_DIRECT+"chen", ROUTING_KEY, "confirm message"); } ``` ![image-20250923162339239](assets/image-20250923162339239.png) ##### 路由键没找到的情况 ```java @Test void testConfirm() { rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY+"chen", "confirm message"); } ``` ![image-20250923162523306](assets/image-20250923162523306.png) #### 解决思路B:备份交换机 我改了一下名称方便记忆 ```java //交换机 public static final String EXCHANGE_DIRECT = "exchange1"; //路由键 public static final String ROUTING_KEY = "routingKey1"; //队列名称 public static final String QUEUE_NAME = "queue1"; ``` * 为目标交换机指定**备份交换机**,当目标交换机投递失败时,把消息投递至 备份交换机 ![image-20250924095550093](assets/image-20250924095550093.png) * 用控制面板创建备份交换机器 **backupExchange01** ![image-20250924102900024](assets/image-20250924102900024.png) ![a](assets/image-20250924103109673.png) ![image-20250924103250764](assets/image-20250924103250764.png) ![image-20250924103351469](assets/image-20250924103351469.png) ![image-20250924103454570](assets/image-20250924103454570.png) ![image-20250924103552570](assets/image-20250924103552570.png) ##### 测试 ```java @Test void testConfirm() { // 不能改交换机的名字, 因为备份交换机是和交换机绑定的 rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY+"aa", "confirm message"); } ``` ![image-20250924103859744](assets/image-20250924103859744.png) ![image-20250924103941257](assets/image-20250924103941257.png) ### 故障情况2:消息队列服务器宕机导致内存中消息丢失 * 解决思路:**消息持久化**到硬盘上,哪怕服务器重启也不会导致消息丢失 #### 测试 默认就是持久话的 1、重启rabbitMQ ```bash docker restart 容器名 ``` ![image-20250924104931242](assets/image-20250924104931242.png) ![image-20250924105249683](assets/image-20250924105249683.png) ![image-20250924105311146](assets/image-20250924105311146.png) ### 故障情况3: 消费端宕机或抛异常导致消息没有成功被消费 ![image-20250924110345924](assets/image-20250924110345924.png) 创建新模块, 只有配置文件不同 ```yml spring: rabbitmq: host: 172.20.10.10 port: 5672 username: guest password: 123456 virtual-host: / listener: simple: acknowledge-mode: manual # 把消息确认模式改为手动确认 ``` ```java @Component public class ConfirmRabbitListener { public static final String QUEUE_Name = "queue1"; @RabbitListener(queues = QUEUE_Name) public void processMessage(String data, Message message, Channel channel) throws IOException { //获取 deliveryTag 消息的唯一标识 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //核心业务 System.out.println(data); //成功了返回AKC //multiple: 是否批量处理数据 channel.basicAck(deliveryTag, false); } catch (Exception e) { //时候被重新投递过 Boolean redelivered = message.getMessageProperties().getRedelivered(); if (redelivered) { //requeue true, broker会重新投放消息 // false broker会丢掉这个消息 //被重新投递过直接丢弃 channel.basicNack(deliveryTag, false, false); } else { //失败了返回NACK // 没被重新投递过重新投递 channel.basicNack(deliveryTag, false, true); } } } } ``` ![image-20250924152133080](assets/image-20250924152133080.png) ##### deliveryTag ![image-20250924153711042](assets/image-20250924153711042.png) ![image-20250924153736865](assets/image-20250924153736865.png) ##### multiple ![image-20250924153814084](assets/image-20250924153814084.png) ##### 控制台发送消息测试 ![image-20250924153014616](assets/image-20250924153014616.png) ##### 没有异常的情况 ![image-20250924153102062](assets/image-20250924153102062.png) ##### 有异常的情况 ![image-20250924153520793](assets/image-20250924153520793.png) ![image-20250924153447549](assets/image-20250924153447549.png) ### 消费端限流 ![image-20250925094724466](assets/image-20250925094724466.png) ![image-20250925094735826](assets/image-20250925094735826.png) ![image-20250925094401628](assets/image-20250925094401628.png) ```java public static final String QUEUE_Name = "queue1"; /** * 测试precache */ @RabbitListener(queues = QUEUE_Name) public void processMessageTestPreCatch(String data, Message message, Channel channel) throws IOException, InterruptedException { //让处理的速度慢一点方便观察 TimeUnit.SECONDS.sleep(1); // 还是采用手动确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } ``` ![image-20250925094148099](assets/image-20250925094148099.png) 开启prefetch ![image-20250925094241911](assets/image-20250925094241911.png) ![image-20250925094609521](assets/image-20250925094609521.png) ### 消息超时 #### 设置队列全局超时的时间 * 给**消息**设定一个**过期时间**,超过这个时间没有被取走的消息就会被**删除** * • **队列层面**:在队列层面设定**消息的过期时间**,并不是队列的过期时间。意思是这 个队列中的消息全部使用**同一个**过期时间。 ![image-20250925095507830](assets/image-20250925095507830.png) ![image-20250925100111544](assets/image-20250925100111544.png) ![image-20250925100308566](assets/image-20250925100308566.png) ```java @Autowired private RabbitTemplate rabbitTemplate; @Test void testTimeOut() { for (int i = 0; i < 100; i++) { rabbitTemplate.convertAndSend("timeout.exchange","timeout.routingKey" ,"testTimeout"); } } ``` 把消费者停掉 ![image-20250925100523182](assets/image-20250925100523182.png) #### MessagePostProcessor设置具体某个消息的过期时间 • **消息本身**:给具体的某个消息设定过期时间 ```java @Test void testTimeOutPostProcesser() { MessagePostProcessor messagePostProcessor = message -> { message.getMessageProperties().setExpiration("6000"); //单位也是毫秒 return message; }; rabbitTemplate.convertAndSend("timeout.exchange","timeout.routingKey" ,"testTimeout",messagePostProcessor); } ``` ![image-20250925101720974](assets/image-20250925101720974.png) ### 死信和死信队列 * 概念:当一个消息无法被消费,它就变成了死信。 #### 死信产生的的原因 ![image-20250925103249168](assets/image-20250925103249168.png) #### 测试. 消息由正常队列进入死信队列 * 先创建死信和信息队列 和对应的路由 (被动接受消息) ![image-20250925103446400](assets/image-20250925103446400.png) ![image-20250925103504276](assets/image-20250925103504276.png) ![image-20250925103551057](assets/image-20250925103551057.png) * 创建正常的交换机和队列 (遇到问题转入到死信队列) ![image-20250925104636242](assets/image-20250925104636242.png) ![image-20250925104150456](assets/image-20250925104150456.png) ![image-20250925104702477](assets/image-20250925104702477.png) ![image-20250925110127923](assets/image-20250925110127923.png) ![image-20250925110142272](assets/image-20250925110142272.png) ### 消息数量超过队列容纳极限 * 发送消息前, 先把消费端停掉 ``` /** * 测试超时,进入死信队列 */ @Test public void testSendMultiMessage() { for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend( "normal.exchange", "normal.routingkey", "测试死信情况2:消息数量超过队列的最大容量" + i); } } ``` ![image-20250926080827210](assets/image-20250926080827210.png) 官网地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange 确定卷映射目录 ```bash docker inspect 容器名 ``` ![image-20250927074310889](assets/image-20250927074310889.png) ## 延迟队列 * 应用场景 * ![image-20250927084713590](assets/image-20250927084713590.png) ### 1、 基于死信的延迟队列 ![image-20250927084745016](assets/image-20250927084745016.png) ### 2、延迟插件 #### 下载和安装 官方文档说明页地址:https://www.rabbitmq.com/community-plugins.html ![image-20231107180045135](assets/image-20231107180045135.png) 下载插件安装文件: ```shell # cd 进入到宿主机对应插件的目录 wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez ``` ```bash # 登录进入容器内部 docker exec -it rabbitmq /bin/bash # rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 退出Docker容器 exit # 重启Docker容器 docker restart rabbitmq # 看看是否启用 rabbitmq-plugins list ``` ![image-20250927074920826](assets/image-20250927074920826.png) ![image-20250927075458843](assets/image-20250927075458843.png) ![image-20250927075534241](assets/image-20250927075534241.png) ![image-20250927075555063](assets/image-20250927075555063.png) #### 创建基于插件的队列和使用 ![image-20250927080557649](assets/image-20250927080557649.png) 关于x-delayed-type参数的理解: > 原本指定交换机类型的地方使用了x-delayed-message这个值,那么这个交换机除了支持延迟消息之外,到底是direct、fanout、topic这些类型中的哪一个呢? > > 这里就额外使用x-delayed-type来指定交换机本身的类型 ![image-20250927080759129](assets/image-20250927080759129.png) 创建生产端 ```java @Autowired private RabbitTemplate rabbitTemplate; /** * 测试延迟插件 */ @Test void testDelayPlugin() { // 创建消息后置处理器 MessagePostProcessor messagePostProcessor=message -> { //安装插件后 x-delayed-message-exchange 才能识别x-delay 参数 message.getMessageProperties().setHeader("x-delay",5000); //过期时间 单位毫秒 return message; }; //发送消息 rabbitTemplate.convertAndSend("exchange.delay", "delay.routingkey", "delayPlugin message"+ new SimpleDateFormat("HH:mm:ss").format(new Date()), messagePostProcessor ); } ``` 创建消费者 ```java @RabbitListener(queues = {"queue.delay"}) public void processDelayPluginMessage(String data,Message message, Channel channel) throws IOException { System.out.println("消息内容是:"+data); System.out.println("当前时间是:"+new SimpleDateFormat("HH:mm:ss").format(new Date())); //确认消 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } ``` ![image-20250927084331156](assets/image-20250927084331156.png) #### 注意点 ![image-20250927084440132](assets/image-20250927084440132.png)