# rabbitMq基础使用姿势 **Repository Path**: leixiaoquan/rabbitmq_demo ## Basic Information - **Project Name**: rabbitMq基础使用姿势 - **Description**: rabbitMq测试demo代码:直接模式,分列模式,多线程,获取队列数,延迟队列. - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2020-12-03 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 一. RabbitMQ 简介 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ ### 主要概念 - RabbitMQ Server: 也叫 broker server,它是一种传输服务。 他的角色就是**维护一条从Producer 到 Consumer 的路线**,保证数据能够按照指定的方式进行传输。 - Producer: **消息生产者**,如图 A、B、C,数据的发送方。消息生产者连接 RabbitMQ 服务器 然后将消息投递到 Exchange。 - Consumer:**消息消费者**,如图 1、2、3,数据的接收方。消息消费者订阅队列,RabbitMQ 将 Queue 中的消息发送到消息消费者。 - Exchange:**生产者将消息发送到 Exchange**(交换器),由 Exchange 将消息路由到一个或多个 Queue 中(或者丢弃)。Exchange 并不存储消息。RabbitMQ 中的 Exchange 有 direct、fanout、topic、headers 四种类型,每种类型对应不同的路由规则。 - Queue:(队列)是 RabbitMQ 的内部对象,用于**存储消息**。消息消费者就是通过订阅队列来获取消息的,RabbitMQ 中的消息都只能存储在 Queue 中,**生产者生产消息并最终投递到Queue 中,消费者可以从 Queue 中获取消息并消费**。多个消费者可以订阅同一个 Queue,这时 Queue 中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。 - RoutingKey:**生产者**在将消息发送给 Exchange 的时候,一般会指定一个 routing key,来**指定这个消息的路由规则**,而这个 routing key 需要与 Exchange Type 及 binding key 联合使用才能最终生效。在 Exchange Type 与 binding key 固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给 Exchange 时,通过指定 routing key 来决定消息流向哪里。RabbitMQ 为 routing key 设定的长度限制为 255bytes。 - Connection: (连接):Producer 和 Consumer 都是通过 TCP 连接到 RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个 TCP 连接。 - Channels: (信道):它建立在上述的 TCP 连接中。数据流动都是在 Channel 中进行的。也就是说,一般情况是程序起始建立 TCP 连接,第二步就是建立这个 Channel。VirtualHost:权限控制的基本单位,一个 VirtualHost 里面有若干 Exchange 和MessageQueue,以及指定被哪些 user 使用 ## 二.docker安装RabbitMq ### RabbitMq部署 ``` # 拉去镜像 docker pull rabbitmq:3-management # 创建挂载目录 mkdir -p /data/soft/rabbit/data # 创建容器 docker run -d --hostname rabbit-host --name rabbitmq -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -v /usr/data/rabbitmq/data:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management ``` **命令说明:** -d: 后台运行 -p:宿主机与容器端口映射(5672是服务端口映射,15672是控制台端口映射) -v:数据挂载 --hostname:主机名(rabbitmq存储数据是根据“节点名称”存储的,节点名称默认为主机名) -e:指定环境变量(默认用户名、默认密码) 浏览器访问 : `http://192.168.184.134:15672/#/` 账号:user 密码: password ## 三.springboot集成RabbitMQ **项目依赖** ``` org.springframework.boot spring-boot-starter-amqp ``` **YML配置文件** ``` spring: rabbitmq: host: 192.168.11.94 port: 5672 username: user password: password ``` **配置类** ```java import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author linguochao * @Date 2020/7/15 17:49 */ @Configuration public class RabbitMQConfig { //======================直接模式======================================== //队列名称 public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //声明QUEUE_INFORM_SMS队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } //======================直接模式======================================== //======================分列模式======================================== //队列名称 public static final String QUEUE_DATA_MYSQL = "queue_data_mysql"; public static final String QUEUE_DATA_SOLR = "queue_data_solr"; public static final String EXCHANGE_TOPICS_DATA="exchange_topics_data"; //声明交换机 @Bean(EXCHANGE_TOPICS_DATA) public Exchange EXCHANGE_TOPICS_INFORM(){ //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_DATA).durable(true).build(); } //声明QUEUE_DATA_MYSQL队列 @Bean(QUEUE_DATA_MYSQL) public Queue QUEUE_DATA_MYSQL(){ return new Queue(QUEUE_DATA_MYSQL); } //声明QUEUE_DATA_SOLR队列 @Bean(QUEUE_DATA_SOLR) public Queue QUEUE_DATA_SOLR(){ return new Queue(QUEUE_DATA_SOLR); } //QUEUE_DATA_MYSQL队列绑定交换机 @Bean public Binding BINDING_DATA_MYSQL(@Qualifier(QUEUE_DATA_MYSQL) Queue queue, @Qualifier(EXCHANGE_TOPICS_DATA) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("").noargs(); } //QUEUE_DATA_SOLR队列绑定交换机 @Bean public Binding BINDING_DATA_SOLR(@Qualifier(QUEUE_DATA_SOLR) Queue queue, @Qualifier(EXCHANGE_TOPICS_DATA) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("").noargs(); } //======================分列模式======================================== } ``` ### **1. 直接模式(Direct)** 我们需要将消息发给唯一一个节点时使用这种模式,这是最简单的一种形式。 ``` 不需要将 Exchange , 需要一个“RouteKey” ``` **1.1. 配置类声明队列** ```java //队列名称 public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //声明QUEUE_INFORM_SMS队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } ``` **1.2 消息生产者** ```java @Service public class RabbitProducer { @Autowired private RabbitMessagingTemplate rabbitMessagingTemplate; /** * rabbitmq直接模式,发送短信 * @param mobile */ public void sendsms(String mobile){ Map msg = new HashMap(); msg.put("mobile",mobile); msg.put("code","123456"); rabbitMessagingTemplate.convertAndSend(RabbitMQConfig.QUEUE_INFORM_SMS,msg); System.out.println("mq发送成功"); } } ``` **1.3 消息消费者** ```java @Component public class RabbitComsumer { @RabbitListener(queues = RabbitMQConfig.QUEUE_INFORM_SMS) public void sendMessage(Map msg){ System.out.println("发送短信成功,用户:"+msg); } } ``` ### **2. 分列模式(Fanout)** - `当我们需要将消息一次发给多个队列时,需要使用这种模式。` - `这种模式不需要 RouteKey, 需要提前将 Exchange 与 Queue 进行绑` - `一个 Exchange 可以绑定多个Queue,一个 Queue 可以同多个 Exchange进行绑定。` **2.1 配置类声明队列,交换机,和绑定关系** ```java //======================分列模式======================================== //队列名称 public static final String QUEUE_DATA_MYSQL = "queue_data_mysql"; public static final String QUEUE_DATA_SOLR = "queue_data_solr"; public static final String EXCHANGE_DATA="exchange_data"; //声明交换机 @Bean(EXCHANGE_DATA) public Exchange EXCHANGE_DATA(){ //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_DATA).durable(true).build(); } //声明QUEUE_DATA_MYSQL队列 @Bean(QUEUE_DATA_MYSQL) public Queue QUEUE_DATA_MYSQL(){ return new Queue(QUEUE_DATA_MYSQL); } //声明QUEUE_DATA_SOLR队列 @Bean(QUEUE_DATA_SOLR) public Queue QUEUE_DATA_SOLR(){ return new Queue(QUEUE_DATA_SOLR); } //QUEUE_DATA_MYSQL队列绑定交换机 @Bean public Binding BINDING_DATA_MYSQL(@Qualifier(QUEUE_DATA_MYSQL) Queue queue, @Qualifier(EXCHANGE_DATA) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("").noargs(); } //QUEUE_DATA_SOLR队列绑定交换机 @Bean public Binding BINDING_DATA_SOLR(@Qualifier(QUEUE_DATA_SOLR) Queue queue, @Qualifier(EXCHANGE_DATA) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("").noargs(); } //======================分列模式======================================== ``` **2.2 消息生产者** ```java /** * rabbitmq 分列模式--添加用户 * @param user */ public void adduser(String user){ rabbitMessagingTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_DATA,"",user); System.out.println("添加用户成功:"+user); } ``` **2.3 消息消费者** ```java @RabbitListener(queues = RabbitMQConfig.QUEUE_DATA_MYSQL) public void addMysqlUser(String user){ System.out.println("Mysql入库,用户:"+user); } @RabbitListener(queues = RabbitMQConfig.QUEUE_DATA_SOLR) public void addSolrUser(String user){ System.out.println("Solr存储,用户:"+user); } ``` ### **3.主题模式(Topic)** - `任何发送到 Topic Exchange 的消息都会被转发到所有关心 RouteKey 中指定话题的 Queue 上` - `这种模式需要 RouteKey,也需要提前绑定 Exchange 与 Queue。` - `此类交换器使得来自不同的源头的消息可以到达一个对列,其实说的更明白一点就是模糊匹配的意思` ``` **“#”表示 0 个或若干个关键字,“.”表示一个关键字。** ``` **3.1 配置类** ```java //======================主题模式======================================== //队列名称 public static final String QUEUE_QQ = "mq.topic.qq.queue"; public static final String QUEUE_WX = "mq.topic.wx.queue"; public static final String QUEUE_LOG = "mq.topic.log.queue"; public static final String EXCHANGE_TOPIC="exchange_data"; //声明交换机 @Bean(EXCHANGE_TOPIC) public Exchange EXCHANGE_TOPIC(){ return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC).durable(true).build(); } //声明QUEUE_QQ队列 @Bean(QUEUE_QQ) public Queue QUEUE_QQ(){ return new Queue(QUEUE_QQ); } //声明QUEUE_WX队列 @Bean(QUEUE_WX) public Queue QUEUE_WX(){ return new Queue(QUEUE_WX); } //声明QUEUE_LOG队列 @Bean(QUEUE_LOG) public Queue QUEUE_LOG(){ return new Queue(QUEUE_LOG); } //队列绑定交换机 @Bean public Binding BINDING_TOPIC_QQ(@Qualifier(QUEUE_QQ) Queue queue, @Qualifier(EXCHANGE_TOPIC) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("qq.*").noargs(); } //队列绑定交换机 @Bean public Binding BINDING_TOPIC_WX(@Qualifier(QUEUE_WX) Queue queue, @Qualifier(EXCHANGE_TOPIC) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("wx.*").noargs(); } //队列绑定交换机 @Bean public Binding BINDING_TOPIC_LOG(@Qualifier(QUEUE_LOG) Queue queue, @Qualifier(EXCHANGE_TOPIC) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("*.log").noargs(); } //======================主题模式======================================== ``` **3.2 消息生产者** ```java /** * rabbitmq 主题模式-- qq 微信消息 * @param msg */ public void qqwx(String msg){ rabbitMessagingTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC,"qq.log",msg); rabbitMessagingTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC,"wx.log",msg); System.out.println("推送qq,微信消息:"+msg); } ``` **3.3 消费者** ```java @RabbitListener(queues = RabbitMQConfig.QUEUE_QQ) public void qq(String msg){ System.out.println("QQ消息:"+msg); } @RabbitListener(queues = RabbitMQConfig.QUEUE_WX) public void wx(String msg){ System.out.println("微信消息:"+msg); } @RabbitListener(queues = RabbitMQConfig.QUEUE_LOG) public void log(String msg){ System.out.println("日志消息:"+msg); } ``` ## 四.RabbitMQ设置多线程处理消息 使用@RabbitListener注解指定消费方法,默认情况是单线程监听队列,可以观察当队列有多个任务时消费端每次只消费一个消息,单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源 可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。 **1、在RabbitmqConfig.java中添加容器工厂配置:** ```java /** * mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列 * @param configurer * @param connectionFactory * @return */ @Bean("customContainerFactory") public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConcurrentConsumers(1); //设置线程数 factory.setMaxConcurrentConsumers(3); //最大线程数 configurer.configure(factory, connectionFactory); return factory; } ``` **2、在@RabbitListener注解中指定容器工厂** ``` @RabbitListener(queues = "监听队列名",containerFactory = "customContainerFactory") ``` ## 五.RabbitMq获取队列消息数 **1. 配置类RabbitMQConfig实例RabbitAdmin** ```java /** * 创建 RabbitAdmin 类,这个类封装了对 RabbitMQ 管理端的操作! * * @param connectionFactory * @return */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类 rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } ``` **2. 增加rabbitMqUtils工具类** ```java @Component @Slf4j public class RabbitMqUtils { @Resource RabbitAdmin rabbitAdmin; /** * 获取对应队列的数量; * * @param queue * @return */ public int getQueueMessageCount(String queue) { int count=0; try { AMQP.Queue.DeclareOk declareOk = rabbitAdmin.getRabbitTemplate().execute(new ChannelCallback() { @Override public AMQP.Queue.DeclareOk doInRabbit(Channel channel) throws Exception { return channel.queueDeclarePassive(queue); } }); count = declareOk.getMessageCount(); } catch (Exception e) { e.printStackTrace(); } return count; } } ``` **3. 使用** ```java int count = rabbitMqUtils.getQueueMessageCount(RabbitDelayConfig.DELAY_QUEUE_HALF_NAME); ```