# springboot+RabbitMQ 5种模式详解 **Repository Path**: qindachuan/rabbit ## Basic Information - **Project Name**: springboot+RabbitMQ 5种模式详解 - **Description**: Rabbit 5中模式详解 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 6 - **Forks**: 0 - **Created**: 2020-12-11 - **Last Updated**: 2022-01-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RabbitMQ 该项目实现了Springboot+RabbitMQ的5种常用模式,并分别代码实现。 pointtopoint包:点对点的队列 work包:work模式 publish包 :发布/订阅者模式(Publish/Subscribe) routing包:路由模式 topic包:主题模式(Topic) ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/174930_46baf44b_2184510.png "微信截图_20201211174902.png") RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括: 1. 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 2. 灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 3. 消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 4. 高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 5. 多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 6. 多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 7. 管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方 面。 8. 跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 9. 插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。 --------------- ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/172747_67d2570b_2184510.png "20200608134830910.png") ## 主要概念 > **RabbitMQ Server**: 也叫broker server,它是一种传输服务。 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。 > **Producer**: 消息生产者,如图A、B、C,数据的发送方。消息生产者连接RabbitMQ服务器然后将消息投递到Exchange。 > **Consumer**:消息消费者,如图1、2、3,数据的接收方。消息消费者订阅队列,RabbitMQ将Queue中的消息发送到消息消费者。 > **Exchange**:生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息。RabbitMQ中的Exchange有direct、fanout、topic、headers四种类型,每种类型对应不同的路由规则。 > **Queue**:(队列)是RabbitMQ的内部对象,用于存储消息。消息消费者就是通过订阅队列来获取消息的,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,消费者可以从Queue中获取消息并消费。多个消费者可以订阅同一个 > > Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。 > **RoutingKey**:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。在Exchange Type与binding key固定的情况下(在正常使用时一 > > 般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255bytes。 > **Connection**:(连接):Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。 > **Channels**: (信道):它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。 > **VirtualHost**:权限控制的基本单位,一个VirtualHost里面有若干Exchange和MessageQueue,以及指定被哪些user使用 ---------------- ## 模式详解 ### 前期准备 pom.xml ```xml org.springframework.boot spring-boot-starter-amqp ``` application.yml ```yml spring: #配置rabbitMq 服务器 rabbitmq: host: 47.95.212.225 #连接端口号 port: 5672 #用户名 username: admin #用户密码 password: admin # 开启发送确认 publisher-confirm-type: correlated # 开启发送失败退回 publisher-returns: true ``` ### 一、点对点的队列 ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/172831_ac99b5e5_2184510.png "20200610143243678.png") #### 模式描述 一个生产者P对应一个队列Q,一个队列Q由一个消费者C监听。 消费者确认模式有自动确认消息和手动确认消息两种模式。 - true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费。 - false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈。 #### 代码实现 ##### 1.队列初始化 ```java //当没有这个队列的时候会自动创建 @Configuration public class PointInitialization { @Bean Queue toPoint(){ Queue queue = new Queue("point.to.point",true); return queue; } } ``` ##### 2.生产者(通过定时任务实现消息生产) ```java @Configuration //1.主要用于标记配置类,兼备Component的效果。 @EnableScheduling // 2.开启定时任务 public class PointProducer { @Autowired RabbitTemplate rabbitTemplate; //3.添加定时任务 @Scheduled(cron = "0/5 * * * * ?") private void configureTasks() { String sendMsg = "点对点队列:" + " " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); this.rabbitTemplate.convertAndSend("point.to.point", sendMsg); } } ``` ##### 3.消费者 ```java @Component public class PointConsumer { //监听的队列名 @RabbitListener(queues = "point.to.point") public void processOne(String name) { System.out.println("point.to.point:" + name); } } ``` ##### 4.测试结果 ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/172904_d12cf17d_2184510.png "image-20201211152756056.png") -------------- ### 二、work模式 ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/172924_3cbcbe59_2184510.png "20200610143309146.png") #### 模式描述 一个生产者P对应一个队列Q,一个队列被多个消费者监听消费。一条消息只能被一个消费这消费。 这里有两种方式: - 轮询方式:队列通过循环的方式将消息对消费者进行分发,总体来说,各个消费者消费的消息是均等的。比如说,当有 C1 和 C2 两个消费者,C1 只会消费奇数消息,C2 只会消费偶数消息。平均每个消费者获得相同数量的消息。 - 公平分发:出现一种现象就是当奇数消息被消费者 C1 消费时间过长,且偶数消息被消费者 C2 消费时间过短时,那么 C1 就会显得很烦忙,而 C2 就会显得很悠闲,很显然这很影响工作效率。那么这种方式就用来解决这个问题,当任何一个消费者消费完周都会应答,而后队列直接分发该消费者信息进行消费。那么在相同时间内 C1 消费的消息肯定没有 C2 多。也叫做"能者多劳"。 > 注:使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。 #### 代码实现 ##### 1.队列初始化 ```java @Configuration public class WorkInitialization { //当没有这个队列的时候会自动创建 @Bean Queue work(){ Queue queue = new Queue("WorkingMode",true); return queue; } } ``` ##### 2.生产者 ```java @Configuration //1.主要用于标记配置类,兼备Component的效果。 @EnableScheduling // 2.开启定时任务 public class WorkProducer { @Autowired RabbitTemplate rabbitTemplate; //3.添加定时任务 @Scheduled(cron = "0/5 * * * * ?") private void configureTasks() { String sendMsg = "工作模式:" + " " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); //指定队列 this.rabbitTemplate.convertAndSend("WorkingMode", sendMsg); } } ``` ##### 3.消费者 ```java @Component public class WorkConsumer { //三个队列同时监听 @RabbitListener(queues = "WorkingMode") public void processOne(String name) { System.out.println("WorkingMode1:" + name); } @RabbitListener(queues = "WorkingMode") public void processTwo(String name) { System.out.println("WorkingMode2:" + name); } @RabbitListener(queues = "WorkingMode") public void processThree(String name) { System.out.println("WorkingMode3:" + name); } } ``` ##### 4.测试结果 > 注意看时间,说明消息是轮询分发的,一个消息只由一个消费者消费。 ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/172948_30041483_2184510.png "image-20201211152312924.png") ---------- ### 三、发布/订阅者模式(Publish/Subscribe) ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/173005_7d0b59fc_2184510.png "20200610143325894.png") #### 模式描述 1. 1个生产者,多个消费者 2. 每一个消费者都有自己的一个队列 3. 生产者没有将消息直接发送到队列,而是发送到了交换机 4. 每个队列都要绑定到交换机(不指定Routing Key) 5. 生产者发送的消息,通过交换机发送到绑定交换机队列,实现一个消息被多个消费者获取的目的。 > 注:交换机没有储存数据的能力,如果该交换机没有绑定任何队列时,这条消息就会被抛弃。 #### 代码实现 ##### 1.队列初始化(类型为fanout) ```java @Configuration public class PublishInitialization { //当没有这个队列的时候会自动创建 @Bean Queue publishOne(){ Queue queue = new Queue("queue.publish.one",true); return queue; } @Bean Queue publishTwo(){ Queue queue = new Queue("queue.publish.two",true); return queue; } @Bean Queue publishThree(){ Queue queue = new Queue("queue.publish.three",true); return queue; } //创建交换器 @Bean FanoutExchange pulishExchange(){ FanoutExchange directExchange = new FanoutExchange("publishExchange"); return directExchange; } //绑定队列(不用指定routing key),参数名字要和bean名字一致 @Bean Binding bindingPublishOne(Queue publishOne, FanoutExchange pulishExchange){ Binding binding = BindingBuilder.bind(publishOne).to(pulishExchange); return binding; } @Bean Binding bindingPublishTwo(Queue publishTwo,FanoutExchange pulishExchange){ Binding binding = BindingBuilder.bind(publishTwo).to(pulishExchange); return binding; } @Bean Binding bindingPublishThree(Queue publishThree,FanoutExchange pulishExchange){ Binding binding = BindingBuilder.bind(publishThree).to(pulishExchange); return binding; } } ``` ##### 2.生产者 ```java @Configuration //1.主要用于标记配置类,兼备Component的效果。 @EnableScheduling // 2.开启定时任务 public class PublishProducer { @Autowired RabbitTemplate rabbitTemplate; //3.添加定时任务 @Scheduled(cron = "0/5 * * * * ?") private void configureTasks() { String sendMsg = "发布订阅模式:" + " " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); //指定队列 不指定RouteKey this.rabbitTemplate.convertAndSend("publishExchange","",sendMsg); } } ``` ##### 3.消费者 ```java @Component public class PublishConsumer { @RabbitListener(queues = "queue.publish.one") public void processOne(String name) { System.out.println("queue.publish.one:" + name); } @RabbitListener(queues = "queue.publish.two") public void processTwo(String name) { System.out.println("queue.publish.two:" + name); } @RabbitListener(queues = "queue.publish.three") public void processThree(String name) { System.out.println("queue.publish.three:" + name); } } ``` ##### 4.测试结果 > 注意看时间,交换机会将消息推送到所有绑定到它的队列。 ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/173051_5359a0d9_2184510.png "image-20201211155037444.png") ### 四、路由模式 ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/173133_7a69c31a_2184510.png "20200610143337929.png") #### 模式描述 根据上述订阅模式类似,跟交换机绑定的队列指定了Routing Key,消息进入交换机后会根据Routing Key,判断将该条消息分发到哪些队列中。 如图举例,当Routing Key为error时,交换机会将消息分发到两条队列中,当Routing Key为info时,交换机只会将消息分发给下方的队列。 #### 代码实现 ##### 1.队列初始化(类型为direct) ```java @Configuration public class RoutingInitialization { //当没有这个队列的时候会自动创建 @Bean Queue routingOne(){ Queue queue = new Queue("queue.routing.one",true); return queue; } @Bean Queue routingTwo(){ Queue queue = new Queue("queue.routing.two",true); return queue; } @Bean Queue routingThree(){ Queue queue = new Queue("queue.routing.three",true); return queue; } //创建交换器 @Bean DirectExchange routingExchange(){ DirectExchange directExchange = new DirectExchange("routingExchange"); return directExchange; } //绑定队列 @Bean Binding bindingRoutingOne(Queue routingOne, DirectExchange routingExchange){ Binding binding = BindingBuilder.bind(routingOne).to(routingExchange).with("1"); return binding; } @Bean Binding bindingRoutingTwo(Queue routingTwo,DirectExchange routingExchange){ Binding binding = BindingBuilder.bind(routingTwo).to(routingExchange).with("2"); return binding; } @Bean Binding bindingRoutingThree(Queue routingThree,DirectExchange routingExchange){ Binding binding = BindingBuilder.bind(routingThree).to(routingExchange).with("3"); return binding; } } ``` ##### 2.生产者 ```java @Configuration //1.主要用于标记配置类,兼备Component的效果。 @EnableScheduling // 2.开启定时任务 public class RoutingProducer { @Autowired RabbitTemplate rabbitTemplate; //3.添加定时任务 @Scheduled(cron = "0/5 * * * * ?") private void configureTasks() { String type=getType(); System.out.println("随机获取到的数字是:"+type); String sendMsg = "路由模式:" + type + " " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); //指定队列 if (type.equals("1")){ this.rabbitTemplate.convertAndSend("routingExchange","1",sendMsg); } if (type.equals("2")){ this.rabbitTemplate.convertAndSend("routingExchange","2",sendMsg); } if (type.equals("3")){ this.rabbitTemplate.convertAndSend("routingExchange","3",sendMsg); } } //随机生成一个[1-3]的int private String getType() { //生成一个范围的随机数, 如:[min, max] int max = 3; int min = 1; Random random = new Random(); int s = random.nextInt(max - min + 1) + min; return String.valueOf(s); } } ``` ##### 3.消费者 ```java @Component public class RoutingConsumer { @RabbitListener(queues = "queue.routing.one") public void processOne(String name) { System.out.println("queue.routing.one:" + name); } @RabbitListener(queues = "queue.routing.two") public void processTwo(String name) { System.out.println("queue.routing.two:" + name); } @RabbitListener(queues = "queue.routing.three") public void processThree(String name) { System.out.println("queue.routing.three:" + name); } } ``` ##### 4.测试结果 > 这里的随机数1,2,3实际就是routing key ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/173208_3543a380_2184510.png "image-20201211162309453.png") ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/173226_2071ebbe_2184510.png "image-20201211162340764.png") -------------- ### 五、主题模式(Topic) ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/173244_bbf593d0_2184510.png "20201127010059819.png") #### 模式描述 符号 # 匹配一个或多个词,符号 * 匹配不多不少一个词。 任何发送到Topic Exchange的消息都会被转发到所有关心Routing Key中指定话题的Queue上。 `usa.#` 能够匹配到 `usa.news.XXX`,但是 `usa.*` 只会匹配到 `usa.XXX` 。 如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息 。 #### 代码实现 ##### 1.队列初始化(类型为Topic) ```java @Configuration public class TopicInitialization { //当没有这个队列的时候会自动创建 @Bean Queue topicOne(){ Queue queue = new Queue("queue.topic.one",true); return queue; } @Bean Queue topicTwo(){ Queue queue = new Queue("queue.topic.two",true); return queue; } @Bean Queue topicThree(){ Queue queue = new Queue("queue.topic.three",true); return queue; } //创建交换器 @Bean TopicExchange topicExchange(){ TopicExchange directExchange = new TopicExchange("topicExchange"); return directExchange; } //绑定队列 @Bean Binding bindingTopicOne(Queue topicOne,TopicExchange topicExchange){ Binding binding = BindingBuilder.bind(topicOne).to(topicExchange).with("#.error"); return binding; } @Bean Binding bindingTopicTwo(Queue topicTwo,TopicExchange topicExchange){ Binding binding = BindingBuilder.bind(topicTwo).to(topicExchange).with("#.log"); return binding; } @Bean Binding bindingTopicThree(Queue topicThree,TopicExchange topicExchange){ Binding binding = BindingBuilder.bind(topicThree).to(topicExchange).with("good.#.timer"); return binding; } } ``` ##### 2.生产者 ```java @Configuration //1.主要用于标记配置类,兼备Component的效果。 @EnableScheduling // 2.开启定时任务 public class TopicProducer { @Autowired RabbitTemplate rabbitTemplate; //3.添加定时任务 @Scheduled(cron = "0/5 * * * * ?") private void configureTasks() { String routing=getRouting(); System.out.println("随机获取到的字符串是:"+routing); String sendMsg = "主题模式:" +routing+ " " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); //指定队列 this.rabbitTemplate.convertAndSend("topicExchange",routing,sendMsg); } private String getRouting(){ //生成一个范围的随机数, 如:[min, max] int max = 2; int min = 0; Random random = new Random(); int s = random.nextInt(max - min + 1); //String数组 String[] str={"测试.error","日志.log","不匹配.timer","good.匹配.timer"}; return str[s]; } } ``` ##### 3.消费者 ```java @Component public class TopicConsumer { @RabbitListener(queues = "queue.topic.one") public void processOne(String name) { System.out.println("queue.topic.one:" + name); } @RabbitListener(queues = "queue.topic.two") public void processTwo(String name) { System.out.println("queue.topic.two:" + name); } @RabbitListener(queues = "queue.topic.three") public void processThree(String name) { System.out.println("queue.topic.three:" + name); } } ``` ##### 4.测试结果 ![输入图片说明](https://images.gitee.com/uploads/images/2020/1211/173309_c62d5547_2184510.png "image-20201211165457100.png")