# tlmqtt
**Repository Path**: guodongAndroid/tlmqtt
## Basic Information
- **Project Name**: tlmqtt
- **Description**: 基于java实现的高并发mqtt的broker,底层使用netty和reactor响应式编程,详细的mqtt3.1.1协议的解析与qos0,1,2的消息。消息持久化。
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 2
- **Created**: 2025-06-09
- **Last Updated**: 2025-07-08
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# tlmqtt
`tlmqtt是一款基于Java开发的轻量级高并发MQTT Broker,采用Netty和Project Reactor实现异步通信,完整支持MQTT 3.1.1协议,包括QoS消息分级、主题通配符、消息持久化等核心功能。项目提供认证(文件/数据库/HTTP)、数据桥接(Kafka/MySQL)和存储(内存/Redis)等可扩展组件,支持MQTT和WebSocket双协议接入。具备生产级特性如SSL加密、会话恢复及高并发处理能力,适用于物联网和实时通信场景。开发者可自定义认证逻辑和存储方案`
## 功能
+ MQTT3.1.1协议自主解析
+ 完整的qos 0,1,2的消息支持
+ 遗嘱消息, 保留消息及消息分发重试
+ SSL方式连接(可选择是否开启)
+ websocket双协议支持
+ 主题过滤
+ 消息的持久化
+ 基于文件,数据库,http接口认证
+ 基于文件的acl订阅/发布权限控制
+ 数据转发功能,目前支持kafka,mysql
## 快速开始
```
io.github.zhsqjm
tlmqtt-core
1.1.0
```
```plain
TlBootstrap bootstrap = new TlBootstrap();
bootstrap
.socket();//开启mqtt协议 默认端口1883
.websocket()//开启websocket协议 默认端口8083
.start();
```
## 后续功能
+ 系统订阅
+ mqtt5.0协议支持
+ 页面展示
+ 集群
### 配置文件说明(common的resources目录下)
```yaml
session:
timeout: 5 #session会话超时时间 如果过了这个时间还没连接 那么就不保持会话
delay: 5 #ack消息确定 5s后没有收到确定就重发
maxRetry: 3 #qos1和qos2的消息重试次数
port:
mqtt: 1883 # mqtt的默认端口
sslMqtt: 8883 # mqtt的ssl端口
websocket: 8083 #websocket的端口
sslWebsocket: 8084 # websocket的ssl端口
ssl:
enabled: true # 是否开确认中,默认开启
certPath: C:\Users\knn\Desktop\fsdownload\cret.crt #证书地址
privatePath: C:\Users\knn\Desktop\fsdownload\private.pem #私钥
auth:
enabled: true #是否开启认证 false就是关闭认证
user: #开启认证后fix的认证信息
- username: watson
password: 12345
- username: zhouhs
password: 12345
# 通道设置
channel:
writeLimit: 104857600 # 全局出站带宽限制:100MB/s
readLimit: 52428800 # 全局入站带宽限制:50MB/s
checkInterval: 1000 #统计周期:1秒
maxTime: 20971520 # `最大突发流量:20MB
lowWaterMark: 65536 # 默认 32768
highWaterMark: 131072 # 默认 65536
#业务线程池队列配置
business:
core: 16 # 核心线程数
max: 32 # 业务线程数
queue: 10000 #任务队列
keepAlive: 60 # 非核心线程数的存活时间
```
### 基础功能
#### 1. 会话的持久化
对于cleansession为0的会话,在```CONNECT```时,会查询上次是否已经存在了该会话,如果存在,那么就会恢复上次会话的状态,如果不存在,那么就创建一个新的会话
#### 2. 通配符匹配
在订阅主题时,可以使用通配符来订阅多个主题。有两种通配符:
+ 单层通配符(+):匹配主题层级中的任意值。例如,home/+/temperature可以匹配home/livingroom/temperature和home/kitchen/temperature。
+ 多层通配符(#):匹配多个层级。例如,home/#可以匹配home/livingroom/temperature和home/kitchen/humidity。
注意:通配符只能用于订阅操作,不能用于发布消息。
#### 3 认证
目前支持文件,http接口以及mysql数据库认证,可同时启用,只要任何一种认证通过即可
##### 3.1 开启或关闭认证。默认开启认证
```java
bootstrap.setAuth(false);
```
##### 3.2 基于文件的认证
+ 声明式
```yaml
auth:
user:
- username: watson
password: 12345
- username: zhouhs
password: 12345
```
+ 编程式
```java
bootstrap.setFixUser(Collections.singletonList(new TlUser("mqtt","mqtt")))
```
##### 3.3 基于http接口
```java
ArrayList httpEntityInfos = new ArrayList<>();
bootstrap.setHttpEntity(httpEntityInfos);
```
##### 3.4 基于mysql数据库认证
```yaml
bootstrap.setSqlEntity(new ArrayList())
```
##### 3.5 自定义认证
继承AbstractTlAuthentication方法
```yaml
public class NoneA extends AbstractTlAuthentication {
@Override
public boolean authenticate(String username, String password) {
return true;
}
@Override
public boolean enabled() {
return true;
}
@Override
public void add(Object object) {
}
}
bootstrap.addAuthentication(new NoneA());
```
#### 4 ACL权限控制(tl-auth的resource目录下)
tlmqtt自定义了一套专属的acl文件格式 并通过初始化时进行解析,具体格式如下
```plain
# 格式说明:类型:值1,值2 | 资源类型:资源 | 操作 | 权限
# 不允许用户名admin客户端订阅a/b主题
user:admin | topic: a/b | sub | deny
# 允许用户名admin客户端订阅a/b主题
user:admin | topic: a/b | sub | allow
# 不允许用户名admin客户端向topic/#主题发布消息
user:admin | topic: topic/# | pub | deny
# 不允许客户端ID为c1和c2客户端订阅a/v主题
client:c1,c2 | topic: a/v | sub | deny
# 不允许ip为127.0.0.1客户端向a/b主题发布消息
ip: 127.0.0.1 | topic: a/b | pub | deny
# 允许任何用户发布订阅任何主题 如果没有匹配到 折都是这条消息
user: * | topic: * | * | allow
```
#### 5. 数据桥接
tlmqtt目前支持将消息转发到mysql和kafka中,并提供接口用于用户自定义数据桥接
##### 5.1 数据库
```java
TlMySqlInfo mySqlInfo = new TlMySqlInfo();
mySqlInfo.setHost("127.0.0.1");
mySqlInfo.setPort(3306);
mySqlInfo.setUsername("root");
mySqlInfo.setPassword("kangni");
mySqlInfo.setDatabase("watson");
mySqlInfo.setTable("mqtt_msg");
mySqlInfo.setDriverClassName("com.mysql.cj.jdbc.Driver");
bootstrap.addBridgeMysql(mySqlInfo);
```
##### 5.2 kafka
```java
TlKafkaInfo kafkaInfo = new TlKafkaInfo("ws", "172.28.33.102:9092",
"org.apache.kafka.common.serialization.StringSerializer",
"org.apache.kafka.common.serialization.StringSerializer");
bootstrap.addBridgeKafka(kafkaInfo);
```
#### 6. 存储
``tlmqtt``默认会话与消息都存储在内存中,当然用户也可以实现接口自定义
+ SessionService 会话存储接口
+ PublishService publish消息与遗嘱消息接口
+ PubRelService pubrel消息接口
+ RetainService 保留消息接口
```java
// 自定义存储为redis
bootstrap.setSessionService(redisSessionService).setPublishService(redisPublishService)
```
#### 7. 保留消息
设置发布的消息为保留消息后,当有客户端订阅这个主题时,就会收到保留消息
#### 8. 遗嘱消息
在客户端非正常断开后 发送遗嘱消息
# 感谢项目
+ [https://github.com/Wizzercn/MqttWk](https://github.com/Wizzercn/MqttWk)
+ [https://github.com/quickmsg/smqttx](https://github.com/quickmsg/smqttx)
`tlmqtt致力于为物联网开发者提供轻量、高效的 MQTT 消息服务,如果您觉得还不错请在右上角点一下 star大家的支持是开源最大动力`