# my-mq
**Repository Path**: panleiming/my-mq
## Basic Information
- **Project Name**: my-mq
- **Description**: 消息队列的使用
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2021-04-30
- **Last Updated**: 2021-07-12
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 目录
1. mq简介
1. 什么是消息队列
2. MQ具备的特点
3. 主要作用
4. 带来的问题
2. RabbitMQ
1. 安装
1. docker安装RabbitMQ集群
2. 工作模型
1. Broker
2. Connection
3. Channel
4. Queue
5. Consumer
6. Exchange
7. Vhost
3. 路由方式
1. Direct直连
2. Topic主题
3. Fanout广播
4. 消息的延迟投递
1. 死信队列实现
2. 使用rabbitmq_delayed_message_exchange实现
5. 服务端限流
1. 队列长度
2. 内存控制
3. 磁盘控制
6. 消费端限流
7. SpringBoot集成RabbitMQ
1. SpringBoot参数
8. 可靠性投递
1. 消息发送到RabbitMQ服务器
1. Transaction(事务)模式
# mq简介
## 什么是消息队列
消息队列,又叫消息中间件。是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。
## MQ具备的特点
1. 是一个独立运行的服务。生产者发送消息,消费者接收消息,需要先跟服务器建立连接。
2. 采用队列作为数据结构,有先进先出的特点。
3. 具有发布订阅的模型,消费者可以获取自己需要的消息。
## 主要作用
1. 实现异步通信。对于数据量大或者处理耗时长的操作,可以引入MQ实现异步通信,减少客户端的等待,提升响应速度,优化客户体验。
2. 实现系统解耦。对于改动影响大的系统之间,可以引入MQ实现解耦,减少系统之间的直接依赖,提升可维护性和可扩展性。
3. 实现流量削峰。对于瞬时的流量峰值的系统,可以引入MQ实现流量削峰,达到保护应用和数据库的目的。
4. 实现广播通信。支持一对多的通信。
## 带来的问题
1. 运维成本增加
2. 系统的可用性降低
3. 系统复杂性提高
# RabbitMQ
## 安装
### docker安装RabbitMQ集群
1. 拉取带management的RabbitMQ镜像
```
docker pull rabbitmq:3.7.17-management
```
2. 创建专门给MQ通信的网络
```
docker network create rabbitmqnet
```
3. 创建三个容器,端口分别是 5673 5674 5675 ,管理端口是 15673 15674 15675
```
节点1
docker run -d \
--name=rabbitmq1 \
-p 5673:5672 \
-p 15673:15672 \
-e RABBITMQ_NODENAME=rabbitmq1 \
-e RABBITMQ_ERLANG_COOKIE='RABBITMQ' \
-h rabbitmq1 \
--net=rabbitmqnet \
rabbitmq:management
节点2
docker run -d \
--name=rabbitmq2 \
-p 5674:5672 \
-p 15674:15672 \
-e RABBITMQ_NODENAME=rabbitmq1 \
-e RABBITMQ_ERLANG_COOKIE='RABBITMQ' \
-h rabbitmq2 \
--net=rabbitmqnet \
rabbitmq:management
节点3
docker run -d \
--name=rabbitmq3 \
-p 5675:5672 \
-p 15675:15672 \
-e RABBITMQ_NODENAME=rabbitmq1 \
-e RABBITMQ_ERLANG_COOKIE='RABBITMQ' \
-h rabbitmq3 \
--net=rabbitmqnet \
rabbitmq:management
```
4. 将后面两个节点添加进容器
```
节点2
docker exec -it rabbitmq2 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbitmq1@rabbitmq1
rabbitmqctl start_app
节点3
docker exec -it rabbitmq3 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbitmq1@rabbitmq1
rabbitmqctl start_app
```
5. 进行访问http://ip:15673/#/
## 工作模型

### Broker
使用RabbitMQ来收发消息,必须安装一个RabbitMQ的服务,默认是5672的端口。这台RabbitMQ的服务器就叫做Broker,中文翻译是代理/中介,因为MQ服务器做的事情就是存储、转发消息。
### Connection
无论是生产者发送消息,还是消费者接收消息,都必须要跟Broker之间建立一个连接,这个连接是一个TCP的长连接。
### Channel
如果所有的生产者发送消息和消费者接收消息,都直接创建和释放TCP长连接的话,对于Broker来说肯定会造成很大的性能损耗,也会浪费时间。
所以在AMQP里面引入了Channel的概念,它是一个虚拟连接,译为通道,或者消息通道。这样就可以在保持的TCP长连接里面去创建和释放Channel,大大减少资源消耗。
不同的Channel是相互隔离的,每个Channel都有自己的编号。对于每个客户端线程来说,Channel就没必要共享了,各自使用各自的Channel。
需要注意的是,Channel是RabbitMQ原生API里面的最重要的编程接口,也就是定义交换机、队列、绑定关系、发送消息、消费消息,调用的都是Channel接口上的方法。
### Queue
在Broker上有一个对象用来存储消息,在RabbitMQ里面这个对象叫做Queue。实际上RabbitMQ是用数据库来存储消息的。
队列也是生产者和消费者的纽带,生产者发送的消息到达队列,在队列中存储。消费者从队列消费消息。
### Consumer
消费者消费消息有两种模式,一种是Pull模式,一种是Push模式。
Pull模式,对应的方法是basicGet。消息存放在服务器端,只有消费者主动获取才能得到信息。如果每隔一段时间获取一次消息,消息的实时性会降低。但是好处是可以根据自己的消费能力决定获取消息的频率。
Push模式,对应的方式是basicConsumer。只要生产者发消息到服务器,就马上推送给消费者,消息保存在客户端,实时性很高,如果消费不过来可能会造成消息积压。
由于队列有FIFO的特性,只有确定前一条消息被消费者接收后,Broker才会把这条消息从数据库删除,继续投递下一条消息。
一个消费者是可以监听多个队列的,一个队列也可以被多个消费者监听。如果需要提升处理消息的能力,可以增加多个消费者。这个时候消息会在多个消费者之间轮询。
### Exchange
路由消息组件。不管有多少队列需要接收消息,都只需要发送到Exchange,有它进行分发。Exchange是不会存储消息的,它只做一件事情,根据规则分发消息。
Exchange和这些需要接收消息的队列必须建立一个绑定关系,并且为每个队列指定一个特殊标志。
Exchange和队列是多对多的绑定关系,也就是说,一个交换机的消息可以路由给多个队列,一个队列也可以接收来自多个交换机的消息。
绑定关系建立好后,生产者发送消息到Exchange,也会携带一个特殊的标志。当这个标志跟绑定的标志匹配时,消息就会发给一个或者多个符合规则的队列。
### Vhost
虚拟主机。Vhost除了可以提高硬件资源的利用率之外,还可以实现资源的隔离和权限的控制。它的作用类似于编程语言的namespace和package。
## 路由方式
### Direct直连
一个队列与直连类型的交换机绑定,需指定一个明确的绑定键(binding key)。
生产者发送消息时会携带一个路由键(routing key)。
当消息的路由键与某个队列的绑定键完全匹配时,这条消息才会从交换机路由到这个队列上。多个队列也可以使用相同的绑定键。
直连类型的交换机适用于一些业务用途明确的消息。
### Topic主题
一个队列与主题类型的交换机绑定时,可以在绑定键中使用通配符。支持两个通配符:
* #代表0个或者多个单词
* *代表不多不少一个单词
单词指的是用英文的点“.”隔开的字符。
### Fanout广播
广播类型的交换机与队列绑定时,不需要指定绑定键。因此生产者发送消息到广播类型的交换机上,也不需要携带路由键。消息达到交换机时,所有与之绑定了的队列,都会收到相同消息的副本。
## 消息的延迟投递
### 死信队列实现
**死信:**消息过期以后,如果没有任何配置,是会直接丢弃的,我们可以通过配置让这样的消息变成死信(Dead Letter),在别的地方存储。
队列在创建的时候可以指定一个死信交换机DLX(Dead Letter Exchange)。死信交换机绑定的队列被称为死信队列DLQ(Dead Letter Queue),DLX实际上也是普通的交换机,DLQ也是普通的队列。
如果消息过期了,队列指定了DLX,就会发送到DLX。如果DLX绑定了DLQ,就会路由到DLQ。路由到DLQ后就可以消费了。
实现方式:
1. 队列需要绑定对应的死信交换机。
2. 需要对消息设置过期时间。
流转流程:生产者——原交换机——原队列(超过TTL之后)——死信交换机——死信队列——最终消费者。

**缺点:**
1. 如果统一用队列来设置消息的TTL,当梯度非常多的情况下,需要创建很多交换机和队列来路由消息。
2. 如果单独设置消息的TTL,则可能会造成队列中的消息阻塞——前一条消息没有出队(没有被消费),后面的消息无法投递。
3. 可能存在一定的时间误差。
### 使用rabbitmq_delayed_message_exchange实现
**安装插件**
```
// 进入插件目录
// 下载插件
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1ez
```
## 服务端限流
### 队列长度
队列中有两个控制长度的属性:
**x-max-length**:队列中最大存储最大消息数,超过这个数量,队头的消息会被丢弃。
**x-max-length-bytes**:队列中存储的最大消息容量(单位bytes),超过这个容量,队头的消息会被丢弃。
**注意:如果该队列设置了DLX(死信交换机),丢弃的消息会被转发到死信交换机上。**
缺点:设置队列长度只在消息堆积的情况下有意义,而且会删除先入队的消息,不能真正地实现服务端限流。
### 内存控制
RabbitMQ会在启动时检测机器的物理内存数值。默认当MQ占用40%以上内存时,MQ会主动抛出一个内存警告并阻塞所有连接(Connections)。可以通过修改**rabbitmq.conf**文件来调整内存阈值,默认值是0.4,如下所示:
```
rabbitmqctl set_vm_memory_high_watermark 0.3
```
### 磁盘控制
通过磁盘来控制消息的发布。当磁盘剩余可用空间低于指定的值时(默认50M),触发流控措施。
举个例子:指定磁盘的30%或者2GB
```
disk_free_limit.relative=3.0
disk_free_limit.absolute=2GB
```
## 消费端限流
默认情况下,如果不进行配置,RabbitMQ会尽可能快速地把队列中的消息发送到消费者。因为消费者会在本地缓存消息,如果消息数量过多,可能会导致OOM或者影响其他进程的正常运行。
在消费者处理消息的能力有限时,例如消费者数量太少,或者单条消息的处理时间过长的情况下,如果希望在一定数量的消息消费完之前,不再推送消息过来,就要用到消费端的流量限制措施。
可以基于Consumer或者channel设置prefetch count的值,含义为Consumer端的最大的unacked messages数目。当超过这个数值的消息未被确认,RabbitMQ会停止投递新的消息给消费者。
```
channel.basicQos(2); // 如果超过2条消息没有发送ACK,当前消费者不再接受队列信息
channel.basicConsumer(QUEUE_NAME, false, consumer);
```
## SpringBoot集成RabbitMQ
工作流程图:

### SpringBoot参数
**注意:基于SpringBoot2.1.5,前缀为spring.rabbitmq,全部配置总体上分成三大类:连接类、消息消费类、消息发送类**
**连接类属性**
| 属性值 | 说明 | 默认值 |
| --------------------------- | ------------------------------------------------------------ | --------- |
| address | 客户端连接的地址,有多个的时候使用逗号分隔,改地址可以是IP与Port的结合 | |
| host | RabbitMQ的主机地址 | localhost |
| port | RabbitMQ的端口号 | |
| virtual-host | 连接到RabbitMQ的虚拟主机 | |
| username | 登录到RabbitMQ的用户名 | |
| password | 登录到RabbitMQ的密码 | |
| ssl.enabled | 启用SSL支持 | false |
| ssl.key-store | 保存SSL证书的地址 | |
| ssl.key-store-password | 访问SSL证书的地址使用的密码 | |
| ssl.trust-store | SSL的可信地址 | |
| ssl.trust-store-password | 访问SSL的可信地址的密码 | |
| ssl.algorithm | SSL算法,默认使用Rabbit的客户端算法库 | |
| cache.channel.checkout-time | 当缓存已满时,获取Channel的等待时间,单位毫秒 | |
| cache.channel.size | 缓存中保存的Channel数量 | |
| cache.connection.mode | 连接缓存的模式 | CHANNEL |
| cache.connection.size | 缓存的连接数 | |
| connection-timeout | 连接超时参数单位为毫秒,设置为“0”代表无穷大 | |
| dynamic | 默认创建一个AmqpAdmin的Bean | true |
**消息消费类**
| 属性值 | 说明 | 默认值 |
| ---------------------------------------- | ------------------------------------------------------------ | ------- |
| listener.simple.acknowledge-mode | 容器的acknowledge模式 | |
| listener.simple.auto-startup | 启动时自动启动容器 | true |
| listener.simple.concurrency | 消费者的最小数量 | |
| listener.simple.default-requeue-rejected | 投递失败时是否重新排队 | true |
| listener.simple.max-concurrency | 消费者的最大数量 | |
| listener.simple.missing-queues-fatal | 容器上声明的队列不可用时是否失败 | |
| listener.simple.prefetch | 在单个请求中处理的消息个数,他应该大于等于事务数量 | |
| listener.simple.retry.enable | 不论是不是重试的发布 | false |
| listener.simple.retry.initial-interval | 第一次与第二次投递尝试的时间间隔 | 1000ms |
| listener.simple.retry.max-attempts | 尝试投递消息的最大数量 | 3 |
| listener.simple.retry.max-interval | 两次尝试的最大间隔时间 | 10000ms |
| listener.simple.retry.multiplier | 上一次尝试时间间隔的乘数 | 1.0 |
| listener.simple.retry.stateless | 重试是有状态的还是无状态的 | true |
| listener.simple.transaction-size | 在一个事务中处理的消息数量。为了获得最佳效果,该值应设置为小于等于每个请求中处理的消息个数,即listener.simple.prefetch的值 | |
**消息发送类**
| 属性值 | 说明 | 默认值 |
| ------------------------------- | ---------------------------------------- | ------ |
| publisher-confirms | 开启Publisher Confirm机制 | |
| publisher-returns | 开启Publisher Return机制 | |
| template.mandatory | 启用强制信息 | false |
| template.receive-timeout | receive()方法的超时时间 | 0 |
| template.reply-timeout | sendAndReceive()方法的超时时间 | 5000 |
| template.retry.enabled | 设置true的时候RabbitTemplate能够实现重试 | false |
| template.retry.initial-interval | 第一次与第二次发布消息的时间间隔 | 1000 |
| template.retry.max-attempts | 尝试发布消息的最大数量 | 3 |
| template.retry.max-interval | 尝试发布消息的最大时间间隔 | 10000 |
| template.retry.multiplier | 上一次尝试时间间隔的乘数 | 1.0 |
## 可靠性投递
出错的几个可能阶段:
1. 消息从生产者发送到Broker。
2. 消息从Exchange路由到Queue。
3. 消息在Queue中存储的时候。
4. 消费者订阅Queue并消费消息。
### 消息发送到RabbitMQ服务器
造成发送失败的可能原因:可能因为网络连接或者Broker的问题(比如硬盘故障、磁盘写满了)导致消息发送失败,生产者不能确定Broker有没有正确的接收。
RabbitMQ里提供了两种机制**服务端确认机制**,也就是在生产者发送消息给RabbitMQ的服务端的时候,服务端会通过某种方式返回一个应答,只要生产者收到了这个应答,就知道消息发送成功了。
第一种是Transaction(事务)模式,第二种是Commit(提交)模式。
#### Transaction(事务)模式