# smart-mq **Repository Path**: hktec/smart-mq ## Basic Information - **Project Name**: smart-mq - **Description**: No description available - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2024-01-03 - **Last Updated**: 2024-01-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # common-mq ## 项目代码结构如下: ``` [-] common-mq ├──[-] com.xbl.smart.rabbitmq ├──[-] config 配置 ├──[-] consumer 消费者标准 ├──[-] enums 枚举 ├──[-] factory 工厂 ├──[-] model 数据模型 ├──[-] producer 生产者标准 ├──[-] retry 重试 ``` ## 技术栈 | 框架 | 说明 | 版本 | | --- | --- | --- | | RabbitMq | 消息中间件 | 3.8.35 | | Spring | J2EE应用程序框架 | 5.3.25 | | SpringBoot | 微服务框架 | 2.6.3 | | Fastjson | 高性能JSON库 | 1.2.83 | ## 交换机类型说明 ```java /** * 交换机类型枚举 * 配置项为 spring.rabbitmq.modules[0].exchange.type=DIRECT */ public enum RabbitExchangeTypeEnum { /** * 直连交换机 *

* 根据routing-key精准匹配队列(最常使用) */ DIRECT, /** * 主题交换机 *

* 根据routing-key模糊匹配队列,*匹配任意一个字符,#匹配0个或多个字符 */ TOPIC, /** * 扇形交换机 *

* 直接分发给所有绑定的队列,忽略routing-key,用于广播消息 */ FANOUT, /** * 头交换机 *

* 类似直连交换机,不同于直连交换机的路由规则建立在头属性上而不是routing-key(使用较少) */ HEADERS, /** * 延迟交换机 */ DELAYED; } ``` ## 重试模式说明 ```java /** * 配置项为 spring.rabbitmq.modules[0].retry.mode=REQUEUE */ public enum RetryModeEnum { /** * 采用spring retry,自行实现listener */ RETRY, /** * 重投队列,进行消费,若还失败最终投入死信队列 */ REQUEUE; } ``` ## 配置示例 ```yaml spring: rabbitmq: host: 127.0.0.1 port: 5672 username: demo password: pwd virtual-host: /project connection-timeout: 15000 publisher-returns: true # 发送者开启 confirm 确认机制 publisher-confirm-type: correlated template: mandatory: true # 动态创建和绑定队列、交换机的配置 # 项目前缀,默认default systemPrefix: demo # 死信队列名称关键字, 默认dead deadFlag: dead # 配置详细信息 modules: # 普通队列 #路由key - routing-key: warn #消费者类名 consumer: warnConsumer #生产者类名 producer: warnProducerService queue: # 队列名称 name: warn # 死信队列交换机 dead-letter-exchange: dead_warn # 死信队列路由key dead-letter-routing-key: dead_warn # 队列额外参数 arguments: # 1分钟(测试),单位毫秒 x-message-ttl: 60000 exchange: # 交换机名称 name: warn retry: #重试模式 mode: RETRY #最大重试次数 maxAttempts: 1 listener: warnRetry - routing-key: common-delay consumer: orderConsumer producer: orderProducerService queue: name: common-delay dead-letter-exchange: dead_warn dead-letter-routing-key: dead_warn arguments: x-message-ttl: 30000 exchange: name: common-delay #交换机类型,若服务端安装了延迟队列插件,可支持延迟队列 type: DELAYED arguments: x-delayed-type: direct retry: mode: REQUEUE maxAttempts: 2 # 死信队列 - routing-key: dead_warn consumer: deadConsumer queue: name: dead_warn exchange: name: dead_warn # 绑定关系请申明到最末尾 - routing-key: warn queue: name: common-delay exchange: name: warn ``` ## 效果图 ### 启动日志 ![](/imgs/start_01.jpg) ### 交换机 ![](/imgs/exchange_01.jpg) ### 队列 ![](/imgs/queue_01.jpg) ## 生产者 ### 定义生产者 ```java @Component public class OrderProducerService extends AbstractProducer {} ``` ### 发送方法 ``` /** * 发送消息 * @param msg 消息内容 */ void send(Object msg); /** * 发送消息,并设置消息过期时间 * @param msg 消息内容 * @param expire 过期时间(ms) */ void send(Object msg, String expire); /** * 发送延迟消息 * @param msg 消息内容 * @param delay 延迟发送时间(ms) */ void send(Object msg, Integer delay); ``` ### 生产者调用示例 ```java @RestController @RequestMapping("/producer") @Slf4j public class ProducerController { @Resource private WarnProducerService warnProducerService; @Resource private OrderProducerService orderProducerService; @GetMapping("/warn/send") public String warnSend(@RequestParam("msg") String msg) { log.info("warnSend发送消息{}", msg); warnProducerService.send(msg); return "ok"; } @GetMapping("/order/send") public String orderSend(@RequestParam("msg") String msg) { log.info("orderSend发送消息{}", msg); orderProducerService.send(msg, 3000); return "ok"; } } ``` ## 消费者示例 ### 定义消费者 ```java @Component public class OrderConsumer extends AbsConsumerService { @Override public void onConsumer(String data , Message message , Channel channel) throws IOException { //消费动作,若消费失败需重试,请抛出异常 } } ``` ## 重试示例 ```java /** * 若重试模式spring.rabbitmq.modules[0].retry.mode=RETRY时,可自行实现重试逻辑 * 实现 ICustomRetryEvent 接口,并在配置文件中指定 */ @Component @Slf4j public class WarnRetry implements ICustomRetryEvent { @Override public void retry(Message message, Channel channel, RetryContext context, Throwable throwable) { log.info("WarnRetry_重试扩展处理:{}" ,message); } @SneakyThrows @Override public void lastRetry(Message message, Channel channel, RetryContext context, Throwable throwable) { log.info("WarnRetry_重试最后一次扩展处理:{}" ,message); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } ``` ## maven引用 ```xml com.xbl.smart common-mq 1.0-SNAPSHOT ```