# rabbitMq基础使用姿势
**Repository Path**: leixiaoquan/rabbitmq_demo
## Basic Information
- **Project Name**: rabbitMq基础使用姿势
- **Description**: rabbitMq测试demo代码:直接模式,分列模式,多线程,获取队列数,延迟队列.
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 1
- **Created**: 2020-12-03
- **Last Updated**: 2020-12-19
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## 一. RabbitMQ 简介
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
### 主要概念
- RabbitMQ Server: 也叫 broker server,它是一种传输服务。 他的角色就是**维护一条从Producer 到 Consumer 的路线**,保证数据能够按照指定的方式进行传输。
- Producer: **消息生产者**,如图 A、B、C,数据的发送方。消息生产者连接 RabbitMQ 服务器 然后将消息投递到 Exchange。
- Consumer:**消息消费者**,如图 1、2、3,数据的接收方。消息消费者订阅队列,RabbitMQ 将 Queue 中的消息发送到消息消费者。
- Exchange:**生产者将消息发送到 Exchange**(交换器),由 Exchange 将消息路由到一个或多个 Queue 中(或者丢弃)。Exchange 并不存储消息。RabbitMQ 中的 Exchange 有 direct、fanout、topic、headers 四种类型,每种类型对应不同的路由规则。
- Queue:(队列)是 RabbitMQ 的内部对象,用于**存储消息**。消息消费者就是通过订阅队列来获取消息的,RabbitMQ 中的消息都只能存储在 Queue 中,**生产者生产消息并最终投递到Queue 中,消费者可以从 Queue 中获取消息并消费**。多个消费者可以订阅同一个 Queue,这时 Queue 中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
- RoutingKey:**生产者**在将消息发送给 Exchange 的时候,一般会指定一个 routing key,来**指定这个消息的路由规则**,而这个 routing key 需要与 Exchange Type 及 binding key 联合使用才能最终生效。在 Exchange Type 与 binding key 固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给 Exchange 时,通过指定 routing key 来决定消息流向哪里。RabbitMQ 为 routing key 设定的长度限制为 255bytes。
- Connection: (连接):Producer 和 Consumer 都是通过 TCP 连接到 RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个 TCP 连接。
- Channels: (信道):它建立在上述的 TCP 连接中。数据流动都是在 Channel 中进行的。也就是说,一般情况是程序起始建立 TCP 连接,第二步就是建立这个 Channel。VirtualHost:权限控制的基本单位,一个 VirtualHost 里面有若干 Exchange 和MessageQueue,以及指定被哪些 user 使用
## 二.docker安装RabbitMq
### RabbitMq部署
```
# 拉去镜像
docker pull rabbitmq:3-management
# 创建挂载目录
mkdir -p /data/soft/rabbit/data
# 创建容器
docker run -d --hostname rabbit-host --name rabbitmq -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -v /usr/data/rabbitmq/data:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management
```
**命令说明:**
-d: 后台运行
-p:宿主机与容器端口映射(5672是服务端口映射,15672是控制台端口映射)
-v:数据挂载
--hostname:主机名(rabbitmq存储数据是根据“节点名称”存储的,节点名称默认为主机名)
-e:指定环境变量(默认用户名、默认密码)
浏览器访问 : `http://192.168.184.134:15672/#/`
账号:user 密码: password
## 三.springboot集成RabbitMQ
**项目依赖**
```
org.springframework.boot
spring-boot-starter-amqp
```
**YML配置文件**
```
spring:
rabbitmq:
host: 192.168.11.94
port: 5672
username: user
password: password
```
**配置类**
```java
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author linguochao
* @Date 2020/7/15 17:49
*/
@Configuration
public class RabbitMQConfig {
//======================直接模式========================================
//队列名称
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
//声明QUEUE_INFORM_SMS队列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS(){
return new Queue(QUEUE_INFORM_SMS);
}
//======================直接模式========================================
//======================分列模式========================================
//队列名称
public static final String QUEUE_DATA_MYSQL = "queue_data_mysql";
public static final String QUEUE_DATA_SOLR = "queue_data_solr";
public static final String EXCHANGE_TOPICS_DATA="exchange_topics_data";
//声明交换机
@Bean(EXCHANGE_TOPICS_DATA)
public Exchange EXCHANGE_TOPICS_INFORM(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_DATA).durable(true).build();
}
//声明QUEUE_DATA_MYSQL队列
@Bean(QUEUE_DATA_MYSQL)
public Queue QUEUE_DATA_MYSQL(){
return new Queue(QUEUE_DATA_MYSQL);
}
//声明QUEUE_DATA_SOLR队列
@Bean(QUEUE_DATA_SOLR)
public Queue QUEUE_DATA_SOLR(){
return new Queue(QUEUE_DATA_SOLR);
}
//QUEUE_DATA_MYSQL队列绑定交换机
@Bean
public Binding BINDING_DATA_MYSQL(@Qualifier(QUEUE_DATA_MYSQL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_DATA) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
//QUEUE_DATA_SOLR队列绑定交换机
@Bean
public Binding BINDING_DATA_SOLR(@Qualifier(QUEUE_DATA_SOLR) Queue queue,
@Qualifier(EXCHANGE_TOPICS_DATA) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
//======================分列模式========================================
}
```
### **1. 直接模式(Direct)**
我们需要将消息发给唯一一个节点时使用这种模式,这是最简单的一种形式。
```
不需要将 Exchange , 需要一个“RouteKey”
```
**1.1. 配置类声明队列**
```java
//队列名称
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
//声明QUEUE_INFORM_SMS队列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS(){
return new Queue(QUEUE_INFORM_SMS);
}
```
**1.2 消息生产者**
```java
@Service
public class RabbitProducer {
@Autowired
private RabbitMessagingTemplate rabbitMessagingTemplate;
/**
* rabbitmq直接模式,发送短信
* @param mobile
*/
public void sendsms(String mobile){
Map msg = new HashMap();
msg.put("mobile",mobile);
msg.put("code","123456");
rabbitMessagingTemplate.convertAndSend(RabbitMQConfig.QUEUE_INFORM_SMS,msg);
System.out.println("mq发送成功");
}
}
```
**1.3 消息消费者**
```java
@Component
public class RabbitComsumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE_INFORM_SMS)
public void sendMessage(Map msg){
System.out.println("发送短信成功,用户:"+msg);
}
}
```
### **2. 分列模式(Fanout)**
- `当我们需要将消息一次发给多个队列时,需要使用这种模式。`
- `这种模式不需要 RouteKey, 需要提前将 Exchange 与 Queue 进行绑`
- `一个 Exchange 可以绑定多个Queue,一个 Queue 可以同多个 Exchange进行绑定。`
**2.1 配置类声明队列,交换机,和绑定关系**
```java
//======================分列模式========================================
//队列名称
public static final String QUEUE_DATA_MYSQL = "queue_data_mysql";
public static final String QUEUE_DATA_SOLR = "queue_data_solr";
public static final String EXCHANGE_DATA="exchange_data";
//声明交换机
@Bean(EXCHANGE_DATA)
public Exchange EXCHANGE_DATA(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_DATA).durable(true).build();
}
//声明QUEUE_DATA_MYSQL队列
@Bean(QUEUE_DATA_MYSQL)
public Queue QUEUE_DATA_MYSQL(){
return new Queue(QUEUE_DATA_MYSQL);
}
//声明QUEUE_DATA_SOLR队列
@Bean(QUEUE_DATA_SOLR)
public Queue QUEUE_DATA_SOLR(){
return new Queue(QUEUE_DATA_SOLR);
}
//QUEUE_DATA_MYSQL队列绑定交换机
@Bean
public Binding BINDING_DATA_MYSQL(@Qualifier(QUEUE_DATA_MYSQL) Queue queue,
@Qualifier(EXCHANGE_DATA) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
//QUEUE_DATA_SOLR队列绑定交换机
@Bean
public Binding BINDING_DATA_SOLR(@Qualifier(QUEUE_DATA_SOLR) Queue queue,
@Qualifier(EXCHANGE_DATA) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
//======================分列模式========================================
```
**2.2 消息生产者**
```java
/**
* rabbitmq 分列模式--添加用户
* @param user
*/
public void adduser(String user){
rabbitMessagingTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_DATA,"",user);
System.out.println("添加用户成功:"+user);
}
```
**2.3 消息消费者**
```java
@RabbitListener(queues = RabbitMQConfig.QUEUE_DATA_MYSQL)
public void addMysqlUser(String user){
System.out.println("Mysql入库,用户:"+user);
}
@RabbitListener(queues = RabbitMQConfig.QUEUE_DATA_SOLR)
public void addSolrUser(String user){
System.out.println("Solr存储,用户:"+user);
}
```
### **3.主题模式(Topic)**
- `任何发送到 Topic Exchange 的消息都会被转发到所有关心 RouteKey 中指定话题的 Queue 上`
- `这种模式需要 RouteKey,也需要提前绑定 Exchange 与 Queue。`
- `此类交换器使得来自不同的源头的消息可以到达一个对列,其实说的更明白一点就是模糊匹配的意思`
```
**“#”表示 0 个或若干个关键字,“.”表示一个关键字。**
```
**3.1 配置类**
```java
//======================主题模式========================================
//队列名称
public static final String QUEUE_QQ = "mq.topic.qq.queue";
public static final String QUEUE_WX = "mq.topic.wx.queue";
public static final String QUEUE_LOG = "mq.topic.log.queue";
public static final String EXCHANGE_TOPIC="exchange_data";
//声明交换机
@Bean(EXCHANGE_TOPIC)
public Exchange EXCHANGE_TOPIC(){
return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC).durable(true).build();
}
//声明QUEUE_QQ队列
@Bean(QUEUE_QQ)
public Queue QUEUE_QQ(){
return new Queue(QUEUE_QQ);
}
//声明QUEUE_WX队列
@Bean(QUEUE_WX)
public Queue QUEUE_WX(){
return new Queue(QUEUE_WX);
}
//声明QUEUE_LOG队列
@Bean(QUEUE_LOG)
public Queue QUEUE_LOG(){
return new Queue(QUEUE_LOG);
}
//队列绑定交换机
@Bean
public Binding BINDING_TOPIC_QQ(@Qualifier(QUEUE_QQ) Queue queue,
@Qualifier(EXCHANGE_TOPIC) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("qq.*").noargs();
}
//队列绑定交换机
@Bean
public Binding BINDING_TOPIC_WX(@Qualifier(QUEUE_WX) Queue queue,
@Qualifier(EXCHANGE_TOPIC) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("wx.*").noargs();
}
//队列绑定交换机
@Bean
public Binding BINDING_TOPIC_LOG(@Qualifier(QUEUE_LOG) Queue queue,
@Qualifier(EXCHANGE_TOPIC) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("*.log").noargs();
}
//======================主题模式========================================
```
**3.2 消息生产者**
```java
/**
* rabbitmq 主题模式-- qq 微信消息
* @param msg
*/
public void qqwx(String msg){
rabbitMessagingTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC,"qq.log",msg);
rabbitMessagingTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC,"wx.log",msg);
System.out.println("推送qq,微信消息:"+msg);
}
```
**3.3 消费者**
```java
@RabbitListener(queues = RabbitMQConfig.QUEUE_QQ)
public void qq(String msg){
System.out.println("QQ消息:"+msg);
}
@RabbitListener(queues = RabbitMQConfig.QUEUE_WX)
public void wx(String msg){
System.out.println("微信消息:"+msg);
}
@RabbitListener(queues = RabbitMQConfig.QUEUE_LOG)
public void log(String msg){
System.out.println("日志消息:"+msg);
}
```
## 四.RabbitMQ设置多线程处理消息
使用@RabbitListener注解指定消费方法,默认情况是单线程监听队列,可以观察当队列有多个任务时消费端每次只消费一个消息,单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源
可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。
**1、在RabbitmqConfig.java中添加容器工厂配置:**
```java
/**
* mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列
* @param configurer
* @param connectionFactory
* @return
*/
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(1); //设置线程数
factory.setMaxConcurrentConsumers(3); //最大线程数
configurer.configure(factory, connectionFactory);
return factory;
}
```
**2、在@RabbitListener注解中指定容器工厂**
```
@RabbitListener(queues = "监听队列名",containerFactory = "customContainerFactory")
```
## 五.RabbitMq获取队列消息数
**1. 配置类RabbitMQConfig实例RabbitAdmin**
```java
/**
* 创建 RabbitAdmin 类,这个类封装了对 RabbitMQ 管理端的操作!
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
```
**2. 增加rabbitMqUtils工具类**
```java
@Component
@Slf4j
public class RabbitMqUtils {
@Resource
RabbitAdmin rabbitAdmin;
/**
* 获取对应队列的数量;
*
* @param queue
* @return
*/
public int getQueueMessageCount(String queue) {
int count=0;
try {
AMQP.Queue.DeclareOk declareOk = rabbitAdmin.getRabbitTemplate().execute(new ChannelCallback() {
@Override
public AMQP.Queue.DeclareOk doInRabbit(Channel channel) throws Exception {
return channel.queueDeclarePassive(queue);
}
});
count = declareOk.getMessageCount();
} catch (Exception e) {
e.printStackTrace();
}
return count;
}
}
```
**3. 使用**
```java
int count = rabbitMqUtils.getQueueMessageCount(RabbitDelayConfig.DELAY_QUEUE_HALF_NAME);
```