# SpringBootRabbitmq **Repository Path**: s2689763871_admin/spring-boot-rabbitmq ## Basic Information - **Project Name**: SpringBootRabbitmq - **Description**: Rabbitmq 四种基本队列的学习使用 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-10-28 - **Last Updated**: 2022-11-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Springboot 使用Rabbitmq [官网文档](!https://www.rabbitmq.com/tutorials/tutorial-four-java.html) \ [jstobigdata 大佬写的案列](https://jstobigdata.com/rabbitmq/headers-exchange-in-amqp-rabbitmq/) \ [一文带你搞定RabbitMQ死信队列](https://mfrank2016.github.io/breeze-blog/2020/05/04/rabbitmq/rabbitmq-how-to-use-dead-letter-queue/) ## 技术要点 1. 四种交换机声明与队列绑定; 2. 消息的发送与接收, 自定义消息队列;\ ---- 2022-11-07 3. 新增 延迟队列 和 死信队列 ### 一、消息队列的声明与绑定 ```yaml spring: rabbitmq: host: localhost port: 5672 password: admin123 username: admin listener: # 配置手动签收 simple: acknowledge-mode: manual # 预读取消息数量 prefetch: 1 # default-requeue-rejected: false direct: acknowledge-mode: manual template: mandatory: true # 确保消息能够推送到交换机中 publisher-returns: true rabbitmq: fanoutExchange: name: fanout.exchange queue: 'fanout_queueA,fanout_queueB,fanout_queueC' directExchange: name: direct.exchange queue: direct.equeue routingKey: "" topicExchange: name: topic.exchange queue: topic.queue # * 匹配一个单词 # 匹配多个单词 routingKey: item.# headerExchange: name: header.exchange queue: header.queueA,header.queueB,header.queueC routingKey: "" deadExchange: name: dead.exchange queue: dead.letter.queue routingKey: dead.letter.routingKey businessExchange: name: business.exchange queue: business.queue routingKey: "" delayedExchange: name: delayed.exchange queue: delayed.queue routingKey: delayed.routingKey ``` **ExchangeConfig** ![img.png](png/img.png) 具体代码详细请查看 ExchangeConfig.java \ 初始化之后可在管理界面查看 交换机和队列 已经声明 并且持久化 \ **Exchange** ![img_1.png](png/exchange.png) **Queue** ![img_1.png](png/queue.png) **Queue 绑定的路由** ![img_1.png](png/routingKey.png) ### 二、消息的收发 **注解方式监听消息队列, 此处使用的是,点对点消息传输** ```java package org.blackdragon.listener; import com.rabbitmq.client.Channel; import org.blackdragon.entity.MessageBody; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author black-dragon * @version V1.0 * @Package 队列监听器 * @date 2022/10/21 * @Copyright */ @Component public class ReceiveListener { private static final Logger log = LoggerFactory.getLogger(ReceiveListener.class); /** * @RabbitListener 注解 * @Exchange 注解 参数内 name: 交换机名称 type: 交换机类型 默认 direct 类型 * @param msg 自定义消息体 * @param tag 消息唯一序列 * @param channel * @throws IOException */ @RabbitListener(bindings = {@QueueBinding( value = @Queue("direct_queue"), exchange = @Exchange(name = "directExchange") )}) public void listenerDirect(MessageBody msg, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) throws IOException { log.info("listenerDirect :tag: {}", tag); log.info("channel {}", channel.toString()); log.info("msg: {} ", msg.toString()); channel.basicAck(tag, false); } } ``` **自定义消息队列监听 需要实现 ChannelAwareMessageListener** ```java package org.blackdragon.listener; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import static org.slf4j.LoggerFactory.getLogger; /** * @author black-dragon * @version V1.0 * @Package 自定义 消息监听器 * @date 2022/10/25 * @Copyright */ public class MyListener implements ChannelAwareMessageListener { private static final Logger log = getLogger(MyListener.class); @Override public void onMessage(Message message, Channel channel) throws Exception { log.info("------------ MyListener -----------"); log.info("message: {}", message.getBody()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e) { log.error("error", e.getMessage()); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } ``` **RabbitmqConfig 配置** ![img.png](png/listenerConfig.png) **使用 @RabbitListener 注解方式消息打印** ![img.png](png/directExchangeReceive.png) **自定义消息监听打印** ![img.png](png/myListener.png)