# rabbitmq-learn **Repository Path**: fuangyer/rabbitmq-learn ## Basic Information - **Project Name**: rabbitmq-learn - **Description**: 黑马RabbitMQ学习课堂代码 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-09-09 - **Last Updated**: 2022-09-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README [toc] ## MQ 的基本概念 ### 1.1 MQ概述 **MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器**。多用于分布式系统之间进行通信。 image-20220516081045650 image-20220516081105302 **小结** - MQ,消息队列,存储消息的中间件 - 分布式系统通信两种方式:直接远程调用 和 借助第三方 完成间接通信 - 发送方称为生产者,接收方称为消费者 ------ 消息队列中间件 (Message Queue Middleware,简称 MQ) 是指利用高效可靠的消息传递机制进行与平台无关的数据交流,它可以在分布式环境下扩展进程间的数据通信,并基于数据通信来进行分布式系统的集成。它主要适用于以下场景: - **项目解耦**:不同的项目或模块可以使用消息中间件进行数据的传递,从而可以保证模块的相对独立,实现解耦。 - **流量削峰**:可以将突发的流量 (如秒杀数据) 写入消息中间件,然后由多个消费者进行异步处理。 - **弹性伸缩**:可以通过对消息中间件进行横向扩展来提高系统的处理能力和吞吐量。 - **发布订阅**:可以用于任意的发布订阅模式中。 - **异步处理**:当我们不需要对数据进行立即处理,或者不关心数据的处理结果时,可以使用中间件进行异步处理。 - **冗余存储**:消息中间件可以对数据进行持久化存储,直到你消费完成后再进行删除。 ### 1.2 MQ 的优势和劣势 优势: - 应用解耦 :提高系统容错性和可维护性 - 异步提速 :提升用户体验和系统吞吐量 - 削峰填谷:提高系统稳定性 image-20220516081326306 image-20220516081409087 image-20220516081425540 image-20220516081442032 image-20220516081500917 image-20220516081515550 image-20220516081537756 劣势: - 系统可用性降低 - 系统复杂度提高 - 一致性问题 image-20220516081754504 **小结** 既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢? ① 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。 ② 容许短暂的不一致性。 ③ 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。 ### 1.5 常见的 MQ 产品 image-20220516082032211 ### 1.6 RabbitMQ 简介 **AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议**的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中 间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。 #### AMQP协议 AMQP (Advanced Message Queuing Protocol) 是一个提供统一消息服务的应用层通讯协议,为消息中间件提供统一的开发规范。不同客户端可以将消息投递到中间件上,或从上面获取消息;发送消息和接收消息的客户端可以采用不同的语言开发、不同的技术实现,但必须遵循相同的 AMQP 协议。AMQP 协议本身包括以下三层: - **Module Layer**:位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如:可以使用 Queue.Declare 命令声明一个队列或者使用 Basic.Consume 订阅消费一个队列中的消息。 - **Session Layer**:位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。 - **Transport Layer**:位于最底层,主要传输二进制数据流 ,提供帧的处理、信道复用、错误检测和数据表示等。 > 交换机分发消息,队列存储消息 image-20220516082227851 RabbitMQ 基础架构如下图: image-20220516082329690 **RabbitMQ 中的相关概念:** ⚫ **Broker**:接收和分发消息的应用,RabbitMQ Server就是 Message Broker ⚫ **Virtual host**:出于多租户和安全因素设计的,**把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网 络中的 namespace 概念**。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多 个vhost,**每个用户在自己的 vhost 创建 exchange/queue 等** ⚫ **Connection**:publisher/consumer 和 broker 之间的 TCP 连接 ⚫ **Channel**:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。**Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线 程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的**。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销 ⚫ **Exchange**:message 到达 broker 的第一站,**根据分发规则,匹配查询表中的 routing key,分发消息到**queue 中去**。常用的类型有:**direct (point-to-point), topic (publish-subscribe) and fanout (multicast)** ⚫ **Queue**:消息最终被送到这里等待 consumer 取走 ⚫ **Binding**:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据 > ### 1. Publisher(发布者) > > 发布者 (或称为生产者) 负责生产消息并将其投递到指定的交换器上。 > > ### 2. Message(消息) > > 消息由消息头和消息体组成。消息头用于存储与消息相关的元数据:如目标交换器的名字 (exchange_name) 、路由键 (RountingKey) 和其他可选配置 (properties) 信息。消息体为实际需要传递的数据。 > > ### 3. Exchange(交换器) > > 交换器负责接收来自生产者的消息,并将将消息路由到一个或者多个队列中,如果路由不到,则返回给生产者或者直接丢弃,这取决于交换器的 mandatory 属性: > > - 当 mandatory 为 true 时:如果交换器无法根据自身类型和路由键找到一个符合条件的队列,则会将该消息返回给生产者; > - 当 mandatory 为 false 时:如果交换器无法根据自身类型和路由键找到一个符合条件的队列,则会直接丢弃该消息。 > > ### 4. BindingKey (绑定键) > > 交换器与队列通过 BindingKey 建立绑定关系。 > > ### 5. Routingkey(路由键) > > 生产者将消息发给交换器的时候,一般会指定一个 RountingKey,用来指定这个消息的路由规则。当 RountingKey 与 BindingKey 基于交换器类型的规则相匹配时,消息被路由到对应的队列中。 > > ### 6. Queue(消息队列) > > 用于存储路由过来的消息。多个消费者可以订阅同一个消息队列,此时队列会将收到的消息将以轮询 (round-robin) 的方式分发给所有消费者。即每条消息只会发送给一个消费者,不会出现一条消息被多个消费者重复消费的情况。 > > ### 7. Consumer(消费者) > > 消费者订阅感兴趣的队列,并负责消费存储在队列中的消息。为了保证消息能够从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制 (message acknowledgement),并通过 autoAck 参数来进行控制: > > - 当 autoAck 为 true 时:此时消息发送出去 (写入TCP套接字) 后就认为消费成功,而不管消费者是否真正消费到这些消息。当 TCP 连接或 channel 因意外而关闭,或者消费者在消费过程之中意外宕机时,对应的消息就丢失。因此这种模式可以提高吞吐量,但会存在数据丢失的风险。 > - 当 autoAck 为 false 时:需要用户在数据处理完成后进行手动确认,只有用户手动确认完成后,RabbitMQ 才认为这条消息已经被成功处理。这可以保证数据的可靠性投递,但会降低系统的吞吐量。 > > ### 8. Connection(连接) > > 用于传递消息的 TCP 连接。 > > ### 9. Channel(信道) > > RabbitMQ 采用类似 NIO (非阻塞式 IO ) 的设计,通过 Channel 来复用 TCP 连接,并确保每个 Channel 的隔离性,就像是拥有独立的 Connection 连接。当数据流量不是很大时,采用连接复用技术可以避免创建过多的 TCP 连接而导致昂贵的性能开销。 > > ### 10. Virtual Host(虚拟主机) > > RabbitMQ 通过虚拟主机来实现逻辑分组和资源隔离,一个虚拟主机就是一个小型的 RabbitMQ 服务器,拥有独立的队列、交换器和绑定关系。用户可以按照不同业务场景建立不同的虚拟主机,虚拟主机之间是完全独立的,你无法将 vhost1 上的交换器与 vhost2 上的队列进行绑定,这可以极大的保证业务之间的隔离性和数据安全。默认的虚拟主机名为 `/` 。 > > ### 11. Broker > > 一个真实部署运行的 RabbitMQ 服务。 ==RabbitMQ 提供了 6 种工作模式==:**简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式**(远程调用,不太算 MQ;暂不作介绍)。 官网对应模式介绍:https://www.rabbitmq.com/getstarted.html image-20220516082905945 ### 1.7 JMS - JMS 即 **Java 消息服务**(JavaMessage Service)应用程序接口,**是一个 Java 平台中关于面向消息中间件 的API** - JMS 是 JavaEE 规范中的一种,类比JDBC - 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有 **小结** 1. RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。 2. RabbitMQ提供了6种工作模式,我们学习5种。这是今天的重点。 3. AMQP 是协议,类比HTTP。 4. JMS 是 API 规范接口,类比 JDBC。 ## RabbitMQ 的安装和配置 https://xie.infoq.cn/article/7d22852e219f110ecd195b4dd https://juejin.cn/post/6844903970545090574 ```sh docker pull rabbitmq:management //下载RabbitMQ镜像 docker run --name rabbit --restart=always -p 15672:15672 -p 5672:5672 -d rabbitmq:management //启动RabbitMQ,默认guest用户,密码也是guest。 ``` ## RabbitMQ 快速入门 ### 3.1 入门程序 需求:使用简单模式完成消息传递 步骤: 1. 创建工程(生成者、消费者) 2. 分别添加依赖 3. 编写生产者发送消息 4. 编写消费者接收消息 依赖: ```xml com.rabbitmq amqp-client 5.14.2 org.slf4j slf4j-log4j12 1.7.35 ``` 生产者程序Producer_HelloWorld: ```java public class Producer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.64.12");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/example");//虚拟机 默认值/ factory.setUsername("giaming");//用户名 默认 guest factory.setPassword("giaming");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 * 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("hello_world",true,false,false,null); /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数: 1. exchange:交换机名称。简单模式下交换机会使用默认的 "" 2. routingKey:路由名称 3. props:配置信息 4. body:发送消息数据 */ String body = "hello rabbitmq~~~"; //6. 发送消息 channel.basicPublish("","hello_world",null,body.getBytes()); //7.释放资源 channel.close(); connection.close(); } } ``` 消费者Consumer_HelloWorld: ```java public class Consumer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.64.12");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/example");//虚拟机 默认值/ factory.setUsername("giaming");//用户名 默认 guest factory.setPassword("giaming");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 * 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("hello_world",true,false,false,null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; channel.basicConsume("hello_world",true,consumer); //关闭资源?不要 } } ``` ### 3.2 小结 上述的入门案例中其实使用的是如下的简单模式: image-20220516092403516 ## RabbitMQ 的工作模式 RabbitMQ 支持多种交换器类型,常用的有以下四种: ### 5.1 fanout 这是最简单的一种交换器模型,此时会把消息路由到与该交换器绑定的所有队列中。如下图,任何发送到 X 交换器上的消息,都会被路由到 Q1 和 Q2 两个队列上。 [![img](https://gitee.com/giaming023/images/raw/master/img/20220516160841.png)](https://camo.githubusercontent.com/a067a9aef6fc0da4ec19e7feae8c1d8edc27efe3aa5e1453dccdcdb2215c6e0b/68747470733a2f2f67697465652e636f6d2f68656962616979696e672f46756c6c2d537461636b2d4e6f7465732f7261772f6d61737465722f70696374757265732f7261626269746d712d66616e6f75742d65786368616e67652e706e67) ### 5.2 direct 把消息路由到 BindingKey 和 RountingKey 完全一样的队列中。如下图,当消息的 RountingKey 为 orange 时,消息会被路由到 Q1 队列;当消息的 RountingKey 为 black 或 green 时,消息会被路由到 Q2 队列。 [![img](https://camo.githubusercontent.com/f112b159c448684a5fb6acef335bc2f85e1321527c37e661458e2b15777bd73f/68747470733a2f2f67697465652e636f6d2f68656962616979696e672f46756c6c2d537461636b2d4e6f7465732f7261772f6d61737465722f70696374757265732f7261626269746d712d6469726563742d65786368616e67652e706e67)](https://camo.githubusercontent.com/f112b159c448684a5fb6acef335bc2f85e1321527c37e661458e2b15777bd73f/68747470733a2f2f67697465652e636f6d2f68656962616979696e672f46756c6c2d537461636b2d4e6f7465732f7261772f6d61737465722f70696374757265732f7261626269746d712d6469726563742d65786368616e67652e706e67) 需要特别说明的是一个交换器绑定多个队列时,它们的 BindingKey 是可以相同的,如下图。此时当消息的 RountingKey 为 black 时,消息会同时被路由到 Q1 和 Q2 队列。 [![img](https://gitee.com/giaming023/images/raw/master/img/20220516160842.png)](https://camo.githubusercontent.com/1b664e0f00df20a835ce8a88a861cf4d52e736a162ad46b5467887ee620c58ab/68747470733a2f2f67697465652e636f6d2f68656962616979696e672f46756c6c2d537461636b2d4e6f7465732f7261772f6d61737465722f70696374757265732f7261626269746d712d6469726563742d65786368616e67652d322e706e67) ### 5.3 topic 将消息路由到 BindingKey 和 RountingKey 相匹配的队列中,匹配规则如下: - RountingKey 和 BindingKey 由多个单词使用逗号 `.` 进行连接; - BindingKey 支持两个特殊符号:`#` 和 `*` 。其中 `*` 用于匹配一个单词, `#` 用于匹配零个或者多个单词。 以下是官方文档中的示例,交换器与队列的绑定情况如图所示,此时的路由情况如下: [![img](https://gitee.com/giaming023/images/raw/master/img/20220516160843.png)](https://camo.githubusercontent.com/40ff5d4c698a93f806d18997c8256adc31a0797835cd1b05f35445ad5a915397/68747470733a2f2f67697465652e636f6d2f68656962616979696e672f46756c6c2d537461636b2d4e6f7465732f7261772f6d61737465722f70696374757265732f7261626269746d712d746f7069632d65786368616e67652e706e67) - 路由键为 `lazy.orange.elephant` 的消息会发送给所有队列; - 路由键为 `quick.orange.fox` 的消息只会发送给 Q1 队列; - 路由键为 `lazy.brown.fox` 的消息只会发送给 Q2 队列; - 路由键为 `lazy.pink.rabbit` 的消息只会发送给 Q2 队列; - 路由键为 `quick.brown.fox` 的消息与任何绑定都不匹配; - 路由键为 `orange` 或 `quick.orange.male.rabbit` 的消息也与任何绑定都不匹配。 ### 5.4 headers 在交换器与队列进行绑定时可以指定一组键值对作为 BindingKey;在发送消息的 headers 中的可以指定一组键值对属性,当这些属性与 BindingKey 相匹配时,则将消息路由到该队列。同时还可以使用 `x-match` 参数指定匹配模式: - **x-match = all** :所有的键值对都相同才算匹配成功; - **x-match = any**:只要有一个键值对相同就算匹配成功。 headers 类型的交换器性能比较差,因此其在实际开发中使用得比较少。 ### 4.1 Work queues 工作队列模式 image-20220516092503830 生产者: ```java public class Producer_WorkQueues { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.64.12");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/example");//虚拟机 默认值/ factory.setUsername("giaming");//用户名 默认 guest factory.setPassword("giaming");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 * 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("work_queues",true,false,false,null); /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数: 1. exchange:交换机名称。简单模式下交换机会使用默认的 "" 2. routingKey:路由名称 3. props:配置信息 4. body:发送消息数据 */ for (int i = 1; i <= 10; i++) { String body = i+"hello rabbitmq~~~"; //6. 发送消息 channel.basicPublish("","work_queues",null,body.getBytes()); } //7.释放资源 channel.close(); connection.close(); } } ``` 消费者(运行两个程序): ```java public class Consumer_WorkQueues1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.64.12");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/example");//虚拟机 默认值/ factory.setUsername("giaming");//用户名 默认 guest factory.setPassword("giaming");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 * 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("work_queues",true,false,false,null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("queue1 body:"+new String(body)); } }; channel.basicConsume("work_queues",true,consumer); //关闭资源?不要 } } ``` image-20220516112811373 **小结** 1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。 2. **Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速**度。例如:短信服务部署多个,只需要有一个节点成功发送即可。 ### 4.2 Pub/Sub 订阅模式 image-20220516092632671 生产者: ```java public class Producer_PubSub { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.64.12");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/example");//虚拟机 默认值/ factory.setUsername("giaming");//用户名 默认 guest factory.setPassword("giaming");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments) 参数: 1. exchange:交换机名称 2. type:交换机类型 DIRECT("direct"),:定向 FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。 TOPIC("topic"),通配符的方式 HEADERS("headers");参数匹配 3. durable:是否持久化 4. autoDelete:自动删除 5. internal:内部使用。 一般false 6. arguments:参数 */ String exchangeName = "test_fanout"; //5. 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); //6. 创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7. 绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1. queue:队列名称 2. exchange:交换机名称 3. routingKey:路由键,绑定规则 如果交换机的类型为fanout ,routingKey设置为"" */ channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; //8. 发送消息 channel.basicPublish(exchangeName,"",null,body.getBytes()); //9. 释放资源 channel.close(); connection.close(); } } ``` 消费者: ```java public class Consumer_PubSub1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.64.12");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/example");//虚拟机 默认值/ factory.setUsername("giaming");//用户名 默认 guest factory.setPassword("giaming");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body)); System.out.println("将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); //关闭资源?不要 } } ``` **小结** 1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。 2. 发布订阅模式与工作队列模式的区别: ⚫ 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机 ⚫ 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机) ⚫ 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 ### 4.3 Routing 路由模式 image-20220516092918764 生产者: ```java public class Producer_Routing { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.64.12");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/example");//虚拟机 默认值/ factory.setUsername("giaming");//用户名 默认 guest factory.setPassword("giaming");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments) 参数: 1. exchange:交换机名称 2. type:交换机类型 DIRECT("direct"),:定向 FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。 TOPIC("topic"),通配符的方式 HEADERS("headers");参数匹配 3. durable:是否持久化 4. autoDelete:自动删除 5. internal:内部使用。 一般false 6. arguments:参数 */ String exchangeName = "test_direct"; //5. 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null); //6. 创建队列 String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7. 绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1. queue:队列名称 2. exchange:交换机名称 3. routingKey:路由键,绑定规则 如果交换机的类型为fanout ,routingKey设置为"" */ //队列1绑定 error channel.queueBind(queue1Name,exchangeName,"error"); //队列2绑定 info error warning channel.queueBind(queue2Name,exchangeName,"info"); // channel.queueBind(queue2Name,exchangeName,"error"); channel.queueBind(queue2Name,exchangeName,"warning"); String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error..."; //8. 发送消息 channel.basicPublish(exchangeName,"info",null,body.getBytes()); //9. 释放资源 channel.close(); connection.close(); } } ``` 消费者: ```java public class Consumer_Routing1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.64.12");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/example");//虚拟机 默认值/ factory.setUsername("giaming");//用户名 默认 guest factory.setPassword("giaming");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body)); System.out.println("将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); //关闭资源?不要 } } ``` **小结** Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。 ### 4.4 Topics 通配符模式 image-20220516093043244 image-20220516093105963 生产者: ```java public class Producer_Topics { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.64.12");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/example");//虚拟机 默认值/ factory.setUsername("giaming");//用户名 默认 guest factory.setPassword("giaming");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments) 参数: 1. exchange:交换机名称 2. type:交换机类型 DIRECT("direct"),:定向 FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。 TOPIC("topic"),通配符的方式 HEADERS("headers");参数匹配 3. durable:是否持久化 4. autoDelete:自动删除 5. internal:内部使用。 一般false 6. arguments:参数 */ String exchangeName = "test_topic"; //5. 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); //6. 创建队列 String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7. 绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1. queue:队列名称 2. exchange:交换机名称 3. routingKey:路由键,绑定规则 如果交换机的类型为fanout ,routingKey设置为"" */ // routing key 系统的名称.日志的级别。 //=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name,exchangeName,"#.error"); channel.queueBind(queue1Name,exchangeName,"order.*"); channel.queueBind(queue2Name,exchangeName,"*.*"); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; //8. 发送消息 channel.basicPublish(exchangeName,"goods.user.error",null,body.getBytes()); //9. 释放资源 channel.close(); connection.close(); } } ``` 消费者: ```java public class Consumer_Topic1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.64.12");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/example");//虚拟机 默认值/ factory.setUsername("giaming");//用户名 默认 guest factory.setPassword("giaming");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body)); System.out.println("将日志信息存入数据库......."); } }; channel.basicConsume(queue1Name,true,consumer); //关闭资源?不要 } } ``` **小结** Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key的时候可以使用通配符,显得更加灵活。 ### 4.5 工作模式总结 1. 简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。 2. 工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。 3. 发布订阅模式 Publish/subscribe 需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消 息发送到绑定的队列。 4. 路由模式 Routing 需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机 后,交换机会根据 routing key 将消息发送到对应的队列。 5. 通配符模式 Topic 需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送 消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。 ## Spring 整合 RabbitMQ 需求:使用 Spring 整合 RabbitMQ 步骤: 生产者: 1. 创建生产者SpringBoot工程 2. 引入start,依赖坐标 ```xml org.springframework.boot spring-boot-starter-amqp ``` 3. 编写yml配置,基本信息配置 ```yml spring: rabbitmq: host: 192.168.64.12 port: 5672 username: guest password: guest virtual-host: / ``` 4. 定义交换机,队列以及绑定关系的配置类 RabbitMQConfig: ```java @Configuration public class RabbitMQConfig { // 交换机 public static final String EXCHANGE_NAME = "boot_topic_exchange"; // 队列 public static final String QUEUE_NAME = "boot_queue"; //1.交换机 @Bean("bootExchange") public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } //2.Queue 队列 @Bean("bootQueue") public Queue bootQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } //3. 队列和交互机绑定关系 Binding /* 1. 知道哪个队列 2. 知道哪个交换机 3. routing key */ @Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); } } ``` 5. 注入RabbitTemplate,调用方法,完成消息发送 ```java @SpringBootTest @RunWith(SpringRunner.class) public class ProducerTest { // 注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSend() { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.heihei", "boot mq hello~~~"); } } ``` 消费者: 1. 创建消费者SpringBoot工程 2. 引入start,依赖坐标 ```xml org.springframework.boot spring-boot-starter-amqp ``` 3. 编写yml配置,基本信息配置 ```yml spring: rabbitmq: host: 192.168.64.12 port: 5672 username: guest password: guest virtual-host: / ``` 4. 定义监听类,使用@RabbitListener注解完成队列监听。 RabbitMQListener: ```java @Component public class RabbitMQListener { @RabbitListener(queues = "boot_queue") public void listenerQueue(Message message) { System.out.println(new String(message.getBody())); } } ``` **小结** 使用 Spring 整合 RabbitMQ 将组件全部使用配置方式实现,简化编码 Spring 提供 RabbitTemplate 简化发送消息 API 使用监听机制简化消费者编码。