# rabbitmq
**Repository Path**: L1692312138/rabbitmq
## Basic Information
- **Project Name**: rabbitmq
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 0
- **Created**: 2022-03-15
- **Last Updated**: 2022-03-18
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 前言
此项目为学习RabbitMQ中间件,内容包括:
- RabbitMQ的API用法
- 通讯方式
- 整体架构
- 与SpringBoot项目整合
- 保证消息可靠性
- 集群可靠性
如果对你有帮助,就支持一下吧^_^
Author:LiuShihao
Last Update Time:2022-03-18
# 一、安装部署
国内 Docker 镜像网站: https://hub.daocloud.io/
(需要部署Docker环境)
```yaml
version: "3.1"
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:3.8.5
container_name: rabbitmq
restart: always
volumes:
- ./data/:/var/lib/rabbitmq/
ports:
- 5672:5672
- 15672:15672
```
运行`docker-compose up -d` 执行yml文件
```shell
# 在Linux内部执行: curl localhost:5672 出现AMQP 安装成功
# 开启可视化界面
# 进入容器内部
docker exec -it rabbitmq bash
# 进入 /opt/rabbitmq ,找到 sbin 和 plugins 文件夹
# 在plugins 目录下会有rabbitmq_managemengt插件,然后在哎sbin目录下执行
./rabbitmq-plugins enable rabbitmq_managemeng
# 访问可视化管理界面 浏览器输入: 你的服务器IP:15672 用户名和密码 都是guest
```
# 二、通讯方式(操作RabbitMQ API)
结构Maven项目,导入Pom依赖:
```xml
com.rabbitmq
amqp-client
5.9.0
junit
junit
4.13.1
```
## 2.1 "Hello World!"

- 一个生产者
- 一个消费者
- 一个队列
- 使用默认交换机
- 默认路由(为队列名)
## 2.2 Work queues

- 一个生产者
- 两个消费者(进行轮询消费,每个消息只会被成功的消费一次)
- 一个队列
- 默认交换机
- 默认路由
## 2.3 Publish/Subscribe

发布/订阅模式(FANOUT分裂模式)
- 一个生产者
- 一个FUNOUT类型交换机(这种模式交换机和队列直接绑定,不需要RoutingKey)
- 两个队列
- 两个消费者
## 2.4 Routing

DIRECT直接模式
- 一个生产者
- 一个DIRECT模式交换机(交换机通过不同的RoutingKey绑定队列)
- 两个队列(一个队列可以绑定多个路由规则,RoutingKey如果没有对应的队列则会被丢弃)
- 两个消费者
## 2.5 Topics

TOPIC主题模式
- 一个生产者
- 一个TOPIC类型交换机(通过不同的路由规则绑定队列)
- 两个队列
- 两个消费者
(注意:需要以aaa.bbb.ccc..方式编写routingkey ,其中有两个特殊字符:*(相当于占位符),#(相当通配符))
## 2.6 RPC

Client/Server RPC模式,通过队列进行解耦
Client 发送请求消息到达 请求消息队列,并且监听 响应消息队列
Server 监听请求消息队列,并且回复响应信息 到响应消息队列
整个流程需要携带两个参数:
1. replyTo: 告知Server将相应信息放到哪个队列 (指定响应队列)
2. correlationId:告知Server发送相应消息时,需要携带位置标示来告知Client响应的信息(响应消息对应请求消息)
## 2.7 Headers
```java
/**
* @author :LiuShihao
* @date :Created in 2022/3/17 4:14 下午
* @desc :Headers类型交换机
*/
public class HeadersPublisher {
public static final String EXCHANGE_NAME = "headers-exchange";
public static final String QUEUE_NAME = "headers-queue";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建Channel
Channel channel = connection.createChannel();
//3.构建Headers类型交换机,创建队列并绑定
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
Map arguments = new HashMap<>();
//x-match 设置为all:表示所有条件都满足才能路由,any:表示有一个条件满足就可以路由
arguments.put("x-match","all");
// arguments.put("x-match","any");
arguments.put("name","jack");
arguments.put("age","23");
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"",arguments);
HashMap headers = new HashMap<>();
headers.put("name","jack");
headers.put("age","23");
//4.发送消息
String msg = "这是Headers类型消息";
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.headers(headers)
.build();
channel.basicPublish(EXCHANGE_NAME,"",properties,msg.getBytes());
System.out.println("消息已发送");
System.in.read();
}
}
```
# 三、RabbitMQ整合Spring
## 3.1 导入依赖
```pom
org.springframework.boot
spring-boot-starter-parent
2.2.2.RELEASE
org.springframework.boot
spring-boot-starter-test
org.springframework.boot
spring-boot-starter-amqp
```
## 3.2 application.yml配置RabbitMQ信息
```yml
spring:
rabbitmq:
host: 172.16.98.100
port: 5672
username: admin
password: admin
virtual-host: /
```
## 3.3 配置类声明交换机、队列和绑定
```Java
/**
* @author :LiuShihao
* @date :Created in 2022/3/15 11:36 下午
* @desc :配置类声明交换机、队列和绑定
*/
@Configuration
public class RabbitMQConfig {
/**Topic类型交换机*/
public static final String TOPIC_EXCHANGE_NAME = "boot-exchange";
/**Queue队列名*/
public static final String QUEUE_NAME = "boot-queue";
/**RoutingKey 路由Key*/
public static final String ROUTING_KEY = "*.black.*";
/**
* 声明交换机: 在SpringBoot项目中,直接通过ExchangeBuilder来构造交换机
* @return org.springframework.amqp.core.Exchange
*/
@Bean
public Exchange exchange(){
// => channel.DeclareExchange
Exchange exchange = ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).build();
return exchange;
}
/**
* 声明队列:在SpringBoot中,通过QueueBuilder.durable(队列名)来构造队列
* @return
*/
@Bean
public Queue queue(){
Queue queue = QueueBuilder.durable(QUEUE_NAME).build();
return queue;
}
/**
* 声明绑定:在SpringBoot项目中,通过BindingBuilder.bind(队列).to(交换机).with(路由Key)构造绑定
* @param exchange 交换机
* @param queue 队列
* @return
*/
@Bean
public Binding binding(Exchange exchange,Queue queue){
Binding binding = BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
return binding;
}
}
```
## 3.4 生产消息
在SpringBoot项目直接通过RabbitTemplate对象进行操作
```java
/**
* @author :LiuShihao
* @date :Created in 2022/3/15 11:58 下午
* @desc :
* 在SpringBoot项目中,通过rabbitTemplate.convertAndSend()生产消息
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class SendMQTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void send(){
//交换机、路由Key、消息内容
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME,"little.black.rabbit","小黑兔");
System.out.println("消息已发送");
}
/**
* 生产消息
*/
@Test
public void sendAndMsgProperties(){
//交换机、路由Key、消息内容、MessageProperties(传递Msg信息:包括CorrelationId、ReplyTo等)
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME, "little.black.rabbit", "小黑兔", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
//设置唯一标识
messageProperties.setCorrelationId("123");
//设置响应队列
// messageProperties.setReplyTo();
return message;
}
});
System.out.println("消息已发送");
}
}
```
## 3.5 监听消息
```java
/**
* @author :LiuShihao
* @date :Created in 2022/3/16 2:21 下午
* @desc :在SpringBoot项目中监听消息,通过@RabbitListener(queues = "队列名") 注解监听队列
* 在SpringBoot项目中,
* 如果要关闭自动ack需要在application.yml文件中设置
* spring.rabbitmq.listener.simple.acknowledge-mode为manual
*
*/
@Component
public class ConsumeListener {
/**
*
* @param msg 队列的消息
* @param channel
* @param message 包含消息的各种信息,如msg、DeliveryTag、CorrelationId、ReplyTo等信息
* @throws IOException
*/
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void consumer(String msg, Channel channel, Message message) throws IOException {
System.out.println("队列的消息:"+msg);
String correlationId = message.getMessageProperties().getCorrelationId();
System.out.println("唯一标识:"+correlationId);
//手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
```
# 四、消息可靠性
## 4.1 确保消息到达交换机(confirms机制)
## 4.2 确保消息从交换机路由到达队列(return机制)
## 4.3 确保消息持久化(消息持久化)
```java
/**
* @author :LiuShihao
* @date :Created in 2022/3/16 2:49 下午
* @desc :开启消息确认机制
* 1. 确保消息到达交换机 confirms机制
* 1.开启confirms: channel.confirmSelect();
* 2.增加异步回调:channel.addConfirmListener();
* 2.确保消息从交换机路由到达队列 return机制
*/
public class ConfirmsPublisher {
public static final String QUEUE_NAME = "confirms";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建Channel
Channel channel = connection.createChannel();
//3.构建队列 注意此处的durable参数只是控制队列持久化,并不能控制消息持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//4.开启confirms
channel.confirmSelect();
//5.设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//消息成功发送到交换机 success
System.out.println("消息成功发送到交换机!");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//消息未成功发送到交换机 fail
System.out.println("消息未成功发送到交换机!");
}
});
//6.设置Return回调,确认消息是否到达队列,需要在发送消息时,设置mandatory参数为true开启Return机制
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
//只有消息没有到达指定队列时,才会触发此函数(例如RoutingKey不对导致消息没有投递到对应队列,就会触发该回调函数)
System.out.println("消息没有到达指定队列!");
}
});
//7.开启消息持久化,如果没有开启消息持久化。如果MQ重启,则消息会丢失
AMQP.BasicProperties pop = new AMQP.BasicProperties()
.builder()
//设置deliveryMode为2表示开启消息持久化,MQ重启后消息不会消失
.deliveryMode(2)
.build();
String message = "Confirms Messaage!";
//8.发送消息 注意:此处需要设置mandatory参数为true,才能开启Return机制
channel.basicPublish("","confirms",true,pop,message.getBytes());
System.out.println("消息发送成功!");
//read方法阻塞,查看WEB可视化界面的客户端连接数
System.in.read();
}
}
```
## 4.4 确保消息正常被消费(手动ack)
```java
/**
* @author :LiuShihao
* @date :Created in 2022/3/16 3:41 下午
* @desc :监听队列消息,关闭自动ack,手动ack
*/
public class ConfirmsConsumer {
public static final String QUEUE_NAME = "confirms";
@Test
public void consumer() throws Exception {
//1.获得连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.获得Channel
final Channel channel = connection.createChannel();
//3.声明队列,持久化队列 durable为true
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//4.设置CallBack函数
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("监听到队列消息:" + new String(body, "UTF-8"));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//5.监听队列,关闭自动ack
channel.basicConsume(QUEUE_NAME,false,callback);
System.out.println("开始监听队列");
System.in.read();
}
}
```
## 4.5 SpringBoot项目实现消息可靠性
1. 在application.yml配置文件中通过配置spring.rabbitmq.publisher-confirm-type为correlated开启confirms机制
2. 在rabbitTemplate.setConfirmCallback()设置confirms机制的回调函数
3. application.yml配置spring.rabbitmq.publisher-returns为true开启Return机制。
4. 通过rabbitTemplate.setReturnCallback()方法这是Return机制的回调函数。
5. 设置消息持久化
```java
/**
* 通过rabbitTemplate.setConfirmCallback()开启confirms机制
*/
@Test
public void sendWithConfirms(){
//1.设置Confirms机制
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("消息已送达交换机!");
}else {
System.out.println("消息未到达交换机!");
}
}
});
//2.设置Return机制
//注意:低版本使用setReturnCallback()方法;在高版本中该方法被弃用,使用setReturnsCallback()方法
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
String msg = new String(message.getBody());
System.out.println("消息未成功投递到队列:"+msg);
}
});
//注意 :只用SpringBoot项目投递消息时,不需要在设置mandatory参数为true
//3.开启消息持久化
//发送消息 设置消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
rabbitTemplate.convertAndSend("", "confirmss", "SpringBoot Confirms Message!", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// MessageDeliveryMode枚举类:
// NON_PERSISTENT 表示不持久化 ;PERSISTENT表示持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
}
```
# 五、死信队列&延时交换机
## 5.1 死信队列
成为死信有三种方式:
1. 消息被拒绝并且禁止重新被投放回队列
2. 消息过期(设置消息的TTL或者设置队列的TTL ,两种方式都能使消息过期)
3. 队列内消息超过最大队列长度
### 5.1.1 拒绝消息
```java
//1.监听普通队列消息,拒绝或者不ack消息,并且requeue为false禁止重新投递队列,则消息会进入死信队列
@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
public void consumer1(String msg, Channel channel, Message message) throws IOException {
System.out.println("监听到普通队列消息:"+msg);
//拒绝消息或者不ack确认消息
//设置拒绝消息,并禁止重新投递队列requeue=false
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
// requeue如果为true则重新排队,如果为false则被丢弃或者进入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
```
### 5.1.2 消息过期
设置TTL(TimeToLive)有两种方式:
1. 直接设置消息的过期时间(存在问题,如果第一个消息TTL为30秒,第二个消息TTL为3秒,此时第二个消息必须等待第一个消息过期之后才能过期)
2. 通过设置队列的过期时间
注意:通过消息过期的方式使消息进入死信队列,消费者不能监听普通队列,需要监听死信队列。
```
//1.设置消息的TTL
@Test
public void sendDeadLetterQueueAndSetTTL(){
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.ttl", "通过设置TTL消息存活时间,使消息进入死信队列", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息存活时间 String 类型 单位为毫秒
message.getMessageProperties().setExpiration("5000");
return message;
}
});
System.out.println("消息已发送");
}
//2.设置队列的TTL
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey("dead.abd")
.ttl(5000)//设置队列的TTL 单位为毫秒
.maxLength(1)
.build();
}
```
### 5.1.3 队列最大长度
在声明队列的时候设置队列的最大长度,则超过这个最大长度后的消息都会被进入死信队列或者被丢弃
```
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey("dead.abd")
.ttl(5000)
.maxLength(1) //设置队列最大长度
.build();
}
```
## 5.2 延时交换机(RabbitMQ Plugins)
使用延时交换机的方式对消息延时是最合适的,因为以上的方式都存在问题:
1. 如果给消息设置过期时间,队列消息是按顺序消费的,后面的消息只能等前面那消息处理后才能被处理,如果后面的消息已经过期了但是前面的消息还没有被处理,则后面的消息无法被过期
2. 如果给队列设置过期时间,则更不方便,如果消息需要分别延时不同的时间的话,那就只能分别创建多个不同的队列
这种方式是将消息直接存放在队列中等待过期的,时间不准确,并且有弊端。
而直接使用延时交换机来进行延时处理的话,消息是被存放在延时交换机中的,等待到达延时时间后,才会被投递到队列中,从而直接消费。
注意:由于消息延时在交换机中,未到达队列中,所以如果如果消息设置了Return机制,则由于消息被延时投递,还未到达队列此时会触发Return回调函数,
并且如果此时RabbitMQ服务重启了,存在延时交换机中的消息会被丢失。
### 5.2.1 下载延时交换机插件
延迟交换机属于RabbitMQ的插件了,需要下载插件,开启配置才能实现消息延时
官网插件地址:https://www.rabbitmq.com/community-plugins.html
延时交换机下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/
插件下载后,将插件复制到RabbitMQ的`plugins`目录下,然后进入sbin目录,执行命令:`rabbitmq-plugins enable rabbitmq_delayed_message_exchange`
### 5.2.2
声明延时交换机
```java
/**
* @author :LiuShihao
* @date :Created in 2022/3/17 10:05 上午
* @desc :构造延时交换机
*/
@Configuration
public class DelayedExchangeConfig {
public static final String DELAYED_EXCHANGE_NAME = "boot-delayed-exchange";
public static final String DELAYED_QUEUE_NAME = "boot-delayed-queue";
public static final String DELAYED_ROUTING_KEY = "*.delayed.*";
//普通队列
@Bean
public Queue delayedQueue(){
return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
}
/**
* 构造延时交换机
* 1、构造arguments参数 指定交换机类型x-delayed-type为topic
* 2、指定type为x-delayed-message类型
*/
@Bean
public Exchange delayedExchange(){
HashMap arguments = new HashMap<>();
arguments.put("x-delayed-type","topic");
CustomExchange customExchange = new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
return customExchange;
}
@Bean
public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
```
发送延时消息
```java
//向延时交换机投递延时消息,如果如果消息设置了Return机制,则由于消息被延时投递,还未到达队列此时会触发Return回调函数
@Test
public void sendDelayedExchange(){
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKe)->{
System.out.println("消息未投递到队列");
});
rabbitTemplate.convertAndSend(DelayedExchangeConfig.DELAYED_EXCHANGE_NAME, "little.delayed.rabbit", "小延时兔子", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置延时时间 单位为毫秒
message.getMessageProperties().setDelay(30000);
return message;
}
});
System.out.println("消息已发送");
}
```
# 六、集群高可用
略