# smart-mq **Repository Path**: hktec/smart-mq ## Basic Information - **Project Name**: smart-mq - **Description**: No description available - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2024-01-03 - **Last Updated**: 2024-01-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # common-mq ## 项目代码结构如下: ``` [-] common-mq ├──[-] com.xbl.smart.rabbitmq ├──[-] config 配置 ├──[-] consumer 消费者标准 ├──[-] enums 枚举 ├──[-] factory 工厂 ├──[-] model 数据模型 ├──[-] producer 生产者标准 ├──[-] retry 重试 ``` ## 技术栈 | 框架 | 说明 | 版本 | | --- | --- | --- | | RabbitMq | 消息中间件 | 3.8.35 | | Spring | J2EE应用程序框架 | 5.3.25 | | SpringBoot | 微服务框架 | 2.6.3 | | Fastjson | 高性能JSON库 | 1.2.83 | ## 交换机类型说明 ```java /** * 交换机类型枚举 * 配置项为 spring.rabbitmq.modules[0].exchange.type=DIRECT */ public enum RabbitExchangeTypeEnum { /** * 直连交换机 *
* 根据routing-key精准匹配队列(最常使用) */ DIRECT, /** * 主题交换机 *
* 根据routing-key模糊匹配队列,*匹配任意一个字符,#匹配0个或多个字符 */ TOPIC, /** * 扇形交换机 *
* 直接分发给所有绑定的队列,忽略routing-key,用于广播消息 */ FANOUT, /** * 头交换机 *
* 类似直连交换机,不同于直连交换机的路由规则建立在头属性上而不是routing-key(使用较少)
*/
HEADERS,
/**
* 延迟交换机
*/
DELAYED;
}
```
## 重试模式说明
```java
/**
* 配置项为 spring.rabbitmq.modules[0].retry.mode=REQUEUE
*/
public enum RetryModeEnum {
/**
* 采用spring retry,自行实现listener
*/
RETRY,
/**
* 重投队列,进行消费,若还失败最终投入死信队列
*/
REQUEUE;
}
```
## 配置示例
```yaml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: demo
password: pwd
virtual-host: /project
connection-timeout: 15000
publisher-returns: true
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
template:
mandatory: true
# 动态创建和绑定队列、交换机的配置
# 项目前缀,默认default
systemPrefix: demo
# 死信队列名称关键字, 默认dead
deadFlag: dead
# 配置详细信息
modules:
# 普通队列
#路由key
- routing-key: warn
#消费者类名
consumer: warnConsumer
#生产者类名
producer: warnProducerService
queue:
# 队列名称
name: warn
# 死信队列交换机
dead-letter-exchange: dead_warn
# 死信队列路由key
dead-letter-routing-key: dead_warn
# 队列额外参数
arguments:
# 1分钟(测试),单位毫秒
x-message-ttl: 60000
exchange:
# 交换机名称
name: warn
retry:
#重试模式
mode: RETRY
#最大重试次数
maxAttempts: 1
listener: warnRetry
- routing-key: common-delay
consumer: orderConsumer
producer: orderProducerService
queue:
name: common-delay
dead-letter-exchange: dead_warn
dead-letter-routing-key: dead_warn
arguments:
x-message-ttl: 30000
exchange:
name: common-delay
#交换机类型,若服务端安装了延迟队列插件,可支持延迟队列
type: DELAYED
arguments:
x-delayed-type: direct
retry:
mode: REQUEUE
maxAttempts: 2
# 死信队列
- routing-key: dead_warn
consumer: deadConsumer
queue:
name: dead_warn
exchange:
name: dead_warn
# 绑定关系请申明到最末尾
- routing-key: warn
queue:
name: common-delay
exchange:
name: warn
```
## 效果图
### 启动日志

### 交换机

### 队列

## 生产者
### 定义生产者
```java
@Component
public class OrderProducerService extends AbstractProducer {}
```
### 发送方法
```
/**
* 发送消息
* @param msg 消息内容
*/
void send(Object msg);
/**
* 发送消息,并设置消息过期时间
* @param msg 消息内容
* @param expire 过期时间(ms)
*/
void send(Object msg, String expire);
/**
* 发送延迟消息
* @param msg 消息内容
* @param delay 延迟发送时间(ms)
*/
void send(Object msg, Integer delay);
```
### 生产者调用示例
```java
@RestController
@RequestMapping("/producer")
@Slf4j
public class ProducerController {
@Resource
private WarnProducerService warnProducerService;
@Resource
private OrderProducerService orderProducerService;
@GetMapping("/warn/send")
public String warnSend(@RequestParam("msg") String msg) {
log.info("warnSend发送消息{}", msg);
warnProducerService.send(msg);
return "ok";
}
@GetMapping("/order/send")
public String orderSend(@RequestParam("msg") String msg) {
log.info("orderSend发送消息{}", msg);
orderProducerService.send(msg, 3000);
return "ok";
}
}
```
## 消费者示例
### 定义消费者
```java
@Component
public class OrderConsumer extends AbsConsumerService