diff --git a/dashboard/components.d.ts b/dashboard/components.d.ts
index 990f1edbbae2aa74f40dc1f9039e3873604a22d9..206cc70aafdac8a4bd3642e531b298da0864588c 100644
--- a/dashboard/components.d.ts
+++ b/dashboard/components.d.ts
@@ -21,6 +21,7 @@ declare module '@vue/runtime-core' {
LayConfigProvider: typeof import('@layui/layui-vue')['LayConfigProvider']
LayContainer: typeof import('@layui/layui-vue')['LayContainer']
LayCountUp: typeof import('@layui/layui-vue')['LayCountUp']
+ LayDatePicker: typeof import('@layui/layui-vue')['LayDatePicker']
LayDropdown: typeof import('@layui/layui-vue')['LayDropdown']
LayDropdownMenu: typeof import('@layui/layui-vue')['LayDropdownMenu']
LayDropdownMenuItem: typeof import('@layui/layui-vue')['LayDropdownMenuItem']
@@ -42,6 +43,7 @@ declare module '@vue/runtime-core' {
LayProgress: typeof import('@layui/layui-vue')['LayProgress']
LayQrcode: typeof import('@layui/layui-vue')['LayQrcode']
LayResult: typeof import('@layui/layui-vue')['LayResult']
+ LayRipple: typeof import('@layui/layui-vue')['LayRipple']
LayRow: typeof import('@layui/layui-vue')['LayRow']
LayScroll: typeof import('@layui/layui-vue')['LayScroll']
LaySelect: typeof import('@layui/layui-vue')['LaySelect']
diff --git a/dashboard/package.json b/dashboard/package.json
index 9e8dc7bf83dcafa070db242fd97cd5ed2a659e66..895897027d35ced40e031149b5ca2ae81a989e1d 100644
--- a/dashboard/package.json
+++ b/dashboard/package.json
@@ -8,7 +8,7 @@
"serve": "vite preview"
},
"dependencies": {
- "@layui/layui-vue": "1.10.0",
+ "@layui/layui-vue": "1.11.4",
"axios": "^1.2.1",
"chart.js": "^4.2.1",
"echarts": "^5.4.1",
diff --git a/dashboard/src/api/module/api.ts b/dashboard/src/api/module/api.ts
index 1762052da4db86bbef076a3c20a1e6d1cea7d2f2..e9a258ee994573fa6c52a89dfaa223871d32ba9c 100644
--- a/dashboard/src/api/module/api.ts
+++ b/dashboard/src/api/module/api.ts
@@ -30,3 +30,8 @@ export const subscriptions_subscription = function (query: {}) {
export const subscriptions_topics = function (query: {}) {
return Http.post('/subscriptions/topics', query)
}
+
+export const system_user_list = function (query: {}) {
+ return Http.post('/system/user/list', query)
+}
+
diff --git a/dashboard/src/layouts/BaseLayout.vue b/dashboard/src/layouts/BaseLayout.vue
index ff1be79b2b5a704476d22314d1107518266d1d81..5932778c7776fa2aa48d6bc4d9de8cda539c4906 100644
--- a/dashboard/src/layouts/BaseLayout.vue
+++ b/dashboard/src/layouts/BaseLayout.vue
@@ -1,168 +1,179 @@
-
-
+
-
-
-
-
- smart-mqtt
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
-
-
-
-
-
-
-
- {{ menu.title }}
-
-
-
-
-
-
-
-
- {{ menu.title }}
-
-
-
-
-
-
-
-
+
+ smart-mqtt
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {{ menu.title }}
+
+
+
+
+
+
+
+
+
+ {{ menu.title }}
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
- 选项一
- 选项二
- 选项三
-
-
-
-
-
-
-
-
-
-
-
-
-
- 用户信息
-
-
- 系统设置
-
-
-
- 注销登录
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+ >
+
+
+
+
+
+
+
+
+
+
+ 授权对象:sxxxx
+
+
+
+
+ 有效期:aaaa
+
+
+
+
+ 选项二
+
+
+
+
+
+
+
+
+
+
+
+ 用户信息
+
+
+ 系统设置
+
+
+
+ 注销登录
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dashboard/src/mockjs/user.ts b/dashboard/src/mockjs/user.ts
index 26d8805984417e4de20dab826226975a7cb5f9a1..6969196fd5a92c7ee023495545c87bdb7afba11f 100644
--- a/dashboard/src/mockjs/user.ts
+++ b/dashboard/src/mockjs/user.ts
@@ -52,6 +52,23 @@ const menus = [
icon: "layui-icon-senior",
title: "ChatMQTT"
},
+ {
+ id: "/system",
+ icon: "layui-icon-set",
+ title: "系统设置",
+ children:[
+ {
+ id: "/system/user",
+ title: "用户",
+ icon:"layui-icon-group"
+ },
+ {
+ id: "/system/setting",
+ title: "设置",
+ icon: "layui-icon-set-sm"
+ }
+ ]
+ },
// {
//
// id: "/chatGPT",
diff --git a/dashboard/src/router/module/base-routes.ts b/dashboard/src/router/module/base-routes.ts
index ee358685cc0c9635e1954a51c8f386aa214dfbb3..4047a9df0cfdce3d752651cb7d388c88a9c2e1f3 100644
--- a/dashboard/src/router/module/base-routes.ts
+++ b/dashboard/src/router/module/base-routes.ts
@@ -74,6 +74,24 @@ export default [
},
]
},
+ {
+ path: '/system',
+ // redirect: "/dashboard/overview",
+ component: BaseLayout,
+ children: [
+ {
+ path: '/system/user',
+ component: () => import('../../views/System/user.vue'),
+ meta: {title: '用户', requireAuth: true},
+ },
+ {
+ path: '/system/setting',
+ component: () => import('../../views/System/setting.vue'),
+ meta: {title: '设置', requireAuth: true},
+ },
+ ]
+
+ },
// {
// path: '/chatGPT',
// component: BaseLayout,
diff --git a/dashboard/src/views/Dashboard/overview.vue b/dashboard/src/views/Dashboard/overview.vue
index 58b949531bd965e0783115d03256ca502b0406e2..a5eddb9e163d3ea171b039f8c981d902d8cdeb2c 100644
--- a/dashboard/src/views/Dashboard/overview.vue
+++ b/dashboard/src/views/Dashboard/overview.vue
@@ -1,9 +1,4 @@
-
-
-
-
-
@@ -132,6 +127,13 @@ export default {
const topicCountChart = ref();
const period_message_received_chart = ref();
const period_message_sent_chart = ref();
+ const license=ref({
+ username:"",
+ password:"",
+ desc:"",
+ datetime:[]
+
+ });
const options = ref({
responsive: true,
maintainAspectRatio: false
@@ -211,7 +213,8 @@ export default {
period_message_sent_chart,
message,
metric,
- options
+ options,
+ license
}
}
}
diff --git a/dashboard/src/views/System/setting.vue b/dashboard/src/views/System/setting.vue
new file mode 100644
index 0000000000000000000000000000000000000000..3a8d9e0658014061d5eaefe946980ea706b9fd4e
--- /dev/null
+++ b/dashboard/src/views/System/setting.vue
@@ -0,0 +1,69 @@
+
+
+
+
+
+
+
+
+
+
+
+ 选择Dashboard页面内展示的语言
+
+
+
+
+
+
+
+
+ 提交
+ 重置
+
+
+
+
+
\ No newline at end of file
diff --git a/dashboard/src/views/System/user.vue b/dashboard/src/views/System/user.vue
new file mode 100644
index 0000000000000000000000000000000000000000..57b5831ce75b242ca7346dd2d3a26112328c7947
--- /dev/null
+++ b/dashboard/src/views/System/user.vue
@@ -0,0 +1,120 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 查询
+ 新增
+
+
+
+
+
+
+
+
+
+
+ 编辑
+ 修改密码
+ 删除
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dashboard/vite.config.ts b/dashboard/vite.config.ts
index e13d05e73dad8be5a2853ac9855d23ecbc3160b8..e00bebd66ac319efd365b09ffe4f83094c8512ac 100644
--- a/dashboard/vite.config.ts
+++ b/dashboard/vite.config.ts
@@ -11,8 +11,8 @@ export default defineConfig({
server:{
proxy:{
'/api': {
- // target: 'http://127.0.0.1:18083/api/',
- target: 'http://82.157.162.230:8083/api/',
+ target: 'http://127.0.0.1:18083/api/',
+ // target: 'http://82.157.162.230:8083/api/',
changeOrigin: true,
rewrite: path => path.replace(/^\/api/, '')
}
diff --git a/docker-compose.yml b/docker-compose.yml
index 66659d7a342b3c2ffc645be2d78fd0720c3ee5ab..dc762a834ec94f4f0c6ef942d774777b44c6ffd0 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -24,7 +24,7 @@ services:
# mqtt-broker:
# container_name: emqx
# hostname: mqtt-broker
-# image: emqx/emqx:5.0.3
+# image: emqx/emqx:5.0.24
# networks:
# mqtt-network: null
# restart: always
@@ -53,6 +53,6 @@ services:
options:
max-size: "100m"
max-file: "1"
- command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe
-# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish
+# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe
+ command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish
version: '3.7'
\ No newline at end of file
diff --git a/plugins/pom.xml b/plugins/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..24bee1331b7cd8f316cf93888bbb61e77d367947
--- /dev/null
+++ b/plugins/pom.xml
@@ -0,0 +1,49 @@
+
+
+
+
+ 4.0.0
+
+ org.smartboot.mqtt
+ smart-mqtt
+ 0.20
+ ../pom.xml
+
+ pom
+
+ plugins
+
+
+ org.smartboot.mqtt
+ smart-mqtt-broker
+
+
+
+
+
+ org.smartboot.mqtt
+ redis-bridge-plugin
+ ${parent.version}
+
+
+ redis.clients
+ jedis
+ 4.3.1
+
+
+
+
+
+ redis-bridge-plugin
+
+
\ No newline at end of file
diff --git a/plugins/redis-bridge-plugin/pom.xml b/plugins/redis-bridge-plugin/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..a89708bce7e8fa9b05a3d968b2af0de5fbbc8ed4
--- /dev/null
+++ b/plugins/redis-bridge-plugin/pom.xml
@@ -0,0 +1,22 @@
+
+
+ plugins
+ org.smartboot.mqtt
+ 0.20
+ ../pom.xml
+
+ 4.0.0
+
+ redis-bridge-plugin
+
+ redis-bridge-plugin
+
+
+
+ redis.clients
+ jedis
+
+
+
+
diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/DataSourcePluginConfig.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/Config.java
similarity index 41%
rename from smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/DataSourcePluginConfig.java
rename to plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/Config.java
index 0514107a9bcc5c025b3fbe0cad41227a8275709c..ee160ec4102eda41c2b33d6975e42453a20bba88 100644
--- a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/DataSourcePluginConfig.java
+++ b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/Config.java
@@ -1,37 +1,63 @@
-package org.smartboot.mqtt.data.persistence.config;
+/*
+ * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
+ *
+ * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
+ *
+ * Enterprise users are required to use this project reasonably
+ * and legally in accordance with the AGPL-3.0 open source agreement
+ * without special permission from the smartboot organization.
+ */
+package org.smartboot.mqtt.bridge.redis;
-import org.smartboot.mqtt.broker.config.PluginConfig;
-public class DataSourcePluginConfig extends PluginConfig {
+class Config {
+ private String host;
+ private int port;
private String password;
-
+
private int timeout = 1000;
private boolean base64 = false;
-
-
+
+
public String getPassword() {
return password;
}
-
+
public void setPassword(String password) {
this.password = password;
}
-
+
public int getTimeout() {
return timeout;
}
-
+
public void setTimeout(int timeout) {
this.timeout = timeout;
}
-
+
public boolean isBase64() {
return base64;
}
-
+
public void setBase64(boolean base64) {
this.base64 = base64;
}
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
}
diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/RedisPlugin.java
similarity index 76%
rename from smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java
rename to plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/RedisPlugin.java
index e4d35420d94553d00100df3504694bf9516e571d..a873e1c48205bd3434390fe4f54e7bc43f04a25d 100644
--- a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java
+++ b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/RedisPlugin.java
@@ -1,16 +1,25 @@
-package org.smartboot.mqtt.data.persistence.impl;
+/*
+ * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
+ *
+ * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
+ *
+ * Enterprise users are required to use this project reasonably
+ * and legally in accordance with the AGPL-3.0 open source agreement
+ * without special permission from the smartboot organization.
+ */
+
+package org.smartboot.mqtt.bridge.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.smartboot.mqtt.bridge.redis.handler.BrokerHandler;
+import org.smartboot.mqtt.bridge.redis.nodeinfo.MessageNodeInfo;
import org.smartboot.mqtt.broker.BrokerContext;
import org.smartboot.mqtt.broker.eventbus.ServerEventType;
import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus;
+import org.smartboot.mqtt.broker.plugin.Plugin;
import org.smartboot.mqtt.broker.plugin.PluginException;
import org.smartboot.mqtt.common.eventbus.EventBus;
-import org.smartboot.mqtt.data.persistence.DataPersistPlugin;
-import org.smartboot.mqtt.data.persistence.config.DataSourcePluginConfig;
-import org.smartboot.mqtt.data.persistence.handler.BrokerHandler;
-import org.smartboot.mqtt.data.persistence.nodeinfo.MessageNodeInfo;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
@@ -20,7 +29,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-public class RedisPlugin extends DataPersistPlugin {
+public class RedisPlugin extends Plugin {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisPlugin.class);
private static final String CONFIG_JSON_PATH = "$['plugins']['redis-bridge'][0]";
private static final String CRTEATE_TIME_FIELD_NAME = "createTime";
@@ -28,31 +37,32 @@ public class RedisPlugin extends DataPersistPlugin {
private static final String DEFALUT_REDIS_BROKER_KEY_FIELD = "name";
private static final Lock lock = new ReentrantLock();
private static JedisPool jedisPool = null;
-
+
+ private Config config;
+
@Override
protected void initPlugin(BrokerContext brokerContext) {
- DataSourcePluginConfig config = brokerContext.parseConfig(CONFIG_JSON_PATH, DataSourcePluginConfig.class);
+ config = brokerContext.parseConfig(CONFIG_JSON_PATH, Config.class);
if (config == null) {
LOGGER.error("config maybe error, parse fail!");
throw new PluginException("start DataPersistRedisPlugin exception");
}
- this.setConfig(config);
-
+
// 完成redis线程池的
if (jedisPool == null) {
lock.lock();
try {
- if (jedisPool == null){
+ if (jedisPool == null) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(10); // 最大连接数
poolConfig.setMaxIdle(2); // 最大空闲连接数
poolConfig.setTestOnReturn(false);
poolConfig.setTestOnBorrow(true); // 检查连接可用性, 确保获取的redis实例可用
poolConfig.setTestOnCreate(false);
- jedisPool = new JedisPool(poolConfig, config.getHost(), config.getPort(),config.getTimeout(), config.getPassword());
+ jedisPool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword());
LOGGER.info("redisPoll create success");
}
- }finally {
+ } finally {
lock.unlock();
}
}
@@ -62,25 +72,24 @@ public class RedisPlugin extends DataPersistPlugin {
Jedis resource = jedisPool.getResource();
Map handler = BrokerHandler.handler(brokerContext);
String result = resource.hget(handler.get(DEFALUT_REDIS_BROKER_KEY_FIELD), CRTEATE_TIME_FIELD_NAME);
- if (result == null){
- handler.put(CRTEATE_TIME_FIELD_NAME,handler.get(RECENT_TIME_FIELD_NAME));
+ if (result == null) {
+ handler.put(CRTEATE_TIME_FIELD_NAME, handler.get(RECENT_TIME_FIELD_NAME));
}
- resource.hmset(handler.get(DEFALUT_REDIS_BROKER_KEY_FIELD),handler);
+ resource.hmset(handler.get(DEFALUT_REDIS_BROKER_KEY_FIELD), handler);
jedisPool.returnResource(resource);
});
-
+
// 消息总线监听
MessageBus messageBus = brokerContext.getMessageBus();
messageBus.consumer((brokerContext1, publishMessage) -> {
Jedis resource = jedisPool.getResource();
- String message = new MessageNodeInfo(publishMessage).toString(this.getConfig().isBase64());
- resource.lpush(brokerContext1.getBrokerConfigure().getName() + ":" + publishMessage.getVariableHeader().getTopicName(),message);
+ String message = new MessageNodeInfo(publishMessage).toString(config.isBase64());
+ resource.lpush(brokerContext1.getBrokerConfigure().getName() + ":" + publishMessage.getVariableHeader().getTopicName(), message);
jedisPool.returnResource(resource);
});
}
-
-
+
@Override
protected void destroyPlugin() {
lock.lock();
diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/handler/BrokerHandler.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/handler/BrokerHandler.java
similarity index 77%
rename from smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/handler/BrokerHandler.java
rename to plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/handler/BrokerHandler.java
index 08823065eac9ebc29c7614282ee40966f1d2ca43..00318a1aa13cebceb4c9d45f219bdc078a01550e 100644
--- a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/handler/BrokerHandler.java
+++ b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/handler/BrokerHandler.java
@@ -1,10 +1,20 @@
-package org.smartboot.mqtt.data.persistence.handler;
+/*
+ * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
+ *
+ * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
+ *
+ * Enterprise users are required to use this project reasonably
+ * and legally in accordance with the AGPL-3.0 open source agreement
+ * without special permission from the smartboot organization.
+ */
+
+package org.smartboot.mqtt.bridge.redis.handler;
import com.sun.management.OperatingSystemMXBean;
+import org.smartboot.mqtt.bridge.redis.nodeinfo.BrokerNodeInfo;
import org.smartboot.mqtt.broker.BrokerConfigure;
import org.smartboot.mqtt.broker.BrokerContext;
import org.smartboot.mqtt.broker.BrokerRuntime;
-import org.smartboot.mqtt.data.persistence.nodeinfo.BrokerNodeInfo;
import java.lang.management.ManagementFactory;
import java.text.SimpleDateFormat;
diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/BrokerNodeInfo.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/BrokerNodeInfo.java
similarity index 82%
rename from smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/BrokerNodeInfo.java
rename to plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/BrokerNodeInfo.java
index 907d5fe0bb8d0e054f9bd2809be72b9033d7854c..1dc025af6be5f100876ae20a8223dd3510d4e6bc 100644
--- a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/BrokerNodeInfo.java
+++ b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/BrokerNodeInfo.java
@@ -1,4 +1,14 @@
-package org.smartboot.mqtt.data.persistence.nodeinfo;
+/*
+ * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
+ *
+ * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
+ *
+ * Enterprise users are required to use this project reasonably
+ * and legally in accordance with the AGPL-3.0 open source agreement
+ * without special permission from the smartboot organization.
+ */
+
+package org.smartboot.mqtt.bridge.redis.nodeinfo;
import com.alibaba.fastjson2.JSON;
diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/MessageNodeInfo.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/MessageNodeInfo.java
similarity index 79%
rename from smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/MessageNodeInfo.java
rename to plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/MessageNodeInfo.java
index 010a36bac4c6fdc898892a308a2238929017c471..cc41fbaa494dc6266ae5ef07f895a8b62bdafcbe 100644
--- a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/MessageNodeInfo.java
+++ b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/MessageNodeInfo.java
@@ -1,4 +1,14 @@
-package org.smartboot.mqtt.data.persistence.nodeinfo;
+/*
+ * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
+ *
+ * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
+ *
+ * Enterprise users are required to use this project reasonably
+ * and legally in accordance with the AGPL-3.0 open source agreement
+ * without special permission from the smartboot organization.
+ */
+
+package org.smartboot.mqtt.bridge.redis.nodeinfo;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
diff --git a/plugins/redis-bridge-plugin/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin b/plugins/redis-bridge-plugin/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin
new file mode 100644
index 0000000000000000000000000000000000000000..993ce6eab42a60714b34ba37e22c168bc9707077
--- /dev/null
+++ b/plugins/redis-bridge-plugin/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin
@@ -0,0 +1 @@
+org.smartboot.mqtt.bridge.redis.RedisPlugin
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 8e5c86a003feedec69a977d10c0984f3e0f439ed..cacb49b895c2225db9da774b43fe88c3d99471c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,13 +4,13 @@
org.smartboot.mqtt
smart-mqtt
smart-mqtt
- 0.19
+ 0.20
4.0.0
mqtt broker
- 0.19
- 1.5.26
+ 0.20
+ 1.5.27
1.1.22
2.6
4.3
@@ -43,6 +43,11 @@
smart-mqtt-client
${smart.mqtt.version}
+
+ org.smartboot.mqtt
+ plugins
+ ${smart.mqtt.version}
+
commons-lang
commons-lang
@@ -56,7 +61,7 @@
org.yaml
snakeyaml
- 1.32
+ 2.0
com.alibaba.fastjson2
@@ -276,5 +281,6 @@
smart-mqtt-broker
smart-mqtt-common
smart-mqtt-client
+ plugins
\ No newline at end of file
diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml
index 238477629a38cac025e9ec397ccc3e4eb4700982..e50e103d6e342f79e62b2f98a4b79f75a771de80 100644
--- a/smart-mqtt-broker/pom.xml
+++ b/smart-mqtt-broker/pom.xml
@@ -5,7 +5,7 @@
org.smartboot.mqtt
smart-mqtt
- 0.19
+ 0.20
../pom.xml
4.0.0
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
index f26c8480823b55771262995a55e1c11a89e671b7..caa7f048ecd297e3113f5d880a02f1c965af34c9 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
@@ -38,7 +38,7 @@ public class BrokerConfigure extends ToString {
/**
* 当前smart-mqtt
*/
- public static final String VERSION = "v0.19";
+ public static final String VERSION = "v0.20";
static final Map SystemEnvironments = new HashMap<>();
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
index 8b84a61ba6153303aebaeb70561be198d759e31a..8b61bb1632f865c8cca1c151e550eba3bb6f9854 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
@@ -29,6 +29,7 @@ import org.smartboot.mqtt.common.AsyncTask;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.QosRetryPlugin;
import org.smartboot.mqtt.common.enums.MqttMetricEnum;
+import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventBus;
import org.smartboot.mqtt.common.eventbus.EventBusImpl;
@@ -356,8 +357,12 @@ public class BrokerContextImpl implements BrokerContext {
BrokerTopic topic = subscriber.getTopic();
topic.getQueue().offer(subscriber);
notifyPush(topic);
-
- //完成retain消息的消费,正式开始监听Topic
+//
+// int preVersion = subscriber.getTopic().getVersion().get();
+// subscriber.batchPublish(BrokerContextImpl.this);
+// if (preVersion != subscriber.getTopic().getVersion().get()) {
+// notifyPush(subscriber.getTopic());
+// }
return;
}
//retain采用严格顺序publish模式
@@ -367,8 +372,15 @@ public class BrokerContextImpl implements BrokerContext {
if (session.getMqttVersion() == MqttVersion.MQTT_5) {
publishBuilder.publishProperties(new PublishProperties());
}
- InflightQueue inflightQueue = session.getInflightQueue();
long offset = storedMessage.getOffset();
+ // Qos0不走飞行窗口
+ if (subscriber.getMqttQoS() == MqttQoS.AT_MOST_ONCE) {
+ subscriber.setRetainConsumerOffset(offset + 1);
+ session.write(publishBuilder.build());
+ retainPushThreadPool.execute(task);
+ return;
+ }
+ InflightQueue inflightQueue = session.getInflightQueue();
// retain消息逐个推送
inflightQueue.offer(publishBuilder, (mqtt) -> {
LOGGER.info("publish retain to client:{} success ", session.getClientId());
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
index e0239b09012b0ea582ee3cedf0e595fd88053c10..d781f8b17304b6376ca2c5688daf55f04937d2f6 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
@@ -96,6 +96,9 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor> userList() {
+ return RestResult.fail(OpenApi.MESSAGE_UPGRADE);
+ }
+}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java
index 1710700d40966a8b87b95dc4b6fe2bbe1e1c41ac..309437f056f219514842b558a8e661f2bf1e105f 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/ConnectProcessor.java
@@ -185,7 +185,7 @@ public class ConnectProcessor implements MqttProcessor {
return;
}
WillMessage willMessage = msg.getPayload().getWillMessage();
- MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().topicName(willMessage.getWillTopic()).qos(MqttQoS.valueOf(msg.getVariableHeader().willQos())).payload(willMessage.getWillMessage()).retained(msg.getFixedHeader().isRetain());
+ MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().topicName(willMessage.getTopic()).qos(MqttQoS.valueOf(msg.getVariableHeader().willQos())).payload(willMessage.getPayload()).retained(msg.getFixedHeader().isRetain());
//todo
if (session.getMqttVersion() == MqttVersion.MQTT_5) {
publishBuilder.publishProperties(new PublishProperties());
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java
index 6196f641dce5bdf881c4e0443fc5be0674577bf1..d353df689bd2ecdb05636693237b34ab691011f0 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttAckProcessor.java
@@ -12,13 +12,13 @@ package org.smartboot.mqtt.broker.processor;
import org.smartboot.mqtt.broker.BrokerContext;
import org.smartboot.mqtt.broker.MqttSession;
-import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
+import org.smartboot.mqtt.common.message.MqttPubQosMessage;
/**
* @author 三刀(zhengjunweimail@163.com)
* @version V1.0 , 2022/4/15
*/
-public class MqttAckProcessor extends AuthorizedMqttProcessor {
+public class MqttAckProcessor extends AuthorizedMqttProcessor {
@Override
public void process0(BrokerContext context, MqttSession session, T t) {
session.getInflightQueue().notify(t);
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttProcessor.java
index 9c6495e62edd40d166e4fb3f7a0dcc57afb5aba8..72632386e8a002f88958821a91e37137d67218a8 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/MqttProcessor.java
@@ -22,6 +22,7 @@ public interface MqttProcessor {
/**
* 处理Mqtt消息
+ *
* @param context
* @param session
* @param t
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java
index 4d3f99f224e562c81e1d59e3859520f43a898c73..21b517b4d11a8c170882d8f7527f52118b1577ad 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/provider/SubscribeProvider.java
@@ -14,6 +14,7 @@ import org.smartboot.mqtt.broker.MqttSession;
/**
* Topic订阅
+ *
* @author 三刀(zhengjunweimail@163.com)
* @version V1.0 , 2022/12/28
*/
diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml
index 1da20a7d84c1069540582c412cf581778e8b4d67..b93a632bcb280a36d0d58843226da751386bd6d9 100644
--- a/smart-mqtt-client/pom.xml
+++ b/smart-mqtt-client/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.19
+ 0.20
../pom.xml
4.0.0
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
index a8688d273ce0a6b81ea2dca19098846ca2cac55f..be39b3cfe0f5cb4a1f5df3d096f78a6d08660a50 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
@@ -42,6 +42,7 @@ import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
import org.smartboot.mqtt.common.message.variable.properties.SubscribeProperties;
+import org.smartboot.mqtt.common.message.variable.properties.WillProperties;
import org.smartboot.mqtt.common.protocol.MqttProtocol;
import org.smartboot.mqtt.common.util.MqttMessageBuilders;
import org.smartboot.mqtt.common.util.ValidateUtils;
@@ -68,6 +69,8 @@ import java.util.function.Consumer;
public class MqttClient extends AbstractSession {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttClient.class);
+ private static final Consumer IGNORE = integer -> {
+ };
/**
* 客户端配置项
*/
@@ -373,35 +376,52 @@ public class MqttClient extends AbstractSession {
* 设置遗嘱消息,必须在connect之前调用
*/
public MqttClient willMessage(WillMessage willMessage) {
- if (clientConfigure.getMqttVersion() != MqttVersion.MQTT_5 && willMessage != null && willMessage.getProperties() != null) {
+ ValidateUtils.notNull(willMessage, "willMessage can't be null");
+ if (clientConfigure.getMqttVersion() != MqttVersion.MQTT_5 && willMessage.getProperties() != null) {
ValidateUtils.throwException("will properties only support on mqtt5");
+ } else if (clientConfigure.getMqttVersion() == MqttVersion.MQTT_5 && willMessage.getProperties() == null) {
+ willMessage.setProperties(new WillProperties());
}
clientConfigure.setWillMessage(willMessage);
return this;
}
+ public void publish(String topic, MqttQoS qos, byte[] payload) {
+ publish(topic, qos, payload, false, true);
+ }
+
public void publish(String topic, MqttQoS qos, byte[] payload, boolean retain) {
- publish(topic, qos, payload, retain, integer -> {
+ publish(topic, qos, payload, retain, true);
+ }
- });
+ public void publish(String topic, MqttQoS qos, byte[] payload, boolean retain, boolean autoFlush) {
+ publish(topic, qos, payload, retain, IGNORE, autoFlush);
+ }
+
+ public void publish(String topic, MqttQoS qos, byte[] payload, Consumer consumer) {
+ publish(topic, qos, payload, false, consumer, true);
}
public void publish(String topic, MqttQoS qos, byte[] payload, boolean retain, Consumer consumer) {
+ publish(topic, qos, payload, retain, consumer, true);
+ }
+
+ public void publish(String topic, MqttQoS qos, byte[] payload, boolean retain, Consumer consumer, boolean autoFlush) {
MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().topicName(topic).qos(qos).payload(payload).retained(retain);
//todo
if (getMqttVersion() == MqttVersion.MQTT_5) {
publishBuilder.publishProperties(new PublishProperties());
}
if (connected) {
- publish(publishBuilder, consumer);
+ publish(publishBuilder, consumer, autoFlush);
} else {
- registeredTasks.offer(() -> publish(publishBuilder, consumer));
+ registeredTasks.offer(() -> publish(publishBuilder, consumer, autoFlush));
}
}
- private void publish(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer) {
+ private void publish(MqttMessageBuilders.PublishBuilder publishBuilder, Consumer consumer, boolean autoFlush) {
if (publishBuilder.qos() == MqttQoS.AT_MOST_ONCE) {
- write(publishBuilder.build());
+ write(publishBuilder.build(), autoFlush);
consumer.accept(0);
return;
}
@@ -413,13 +433,7 @@ public class MqttClient extends AbstractSession {
MqttClient.this.notifyAll();
}
});
- if (inflightMessage != null) {
- flush();
- if (publishBuilder.qos() == MqttQoS.AT_MOST_ONCE) {
- inflightMessage.setResponseMessage(inflightMessage.getOriginalMessage());
- inflightQueue.commit(inflightMessage);
- }
- } else {
+ if (inflightMessage == null) {
try {
synchronized (this) {
wait();
@@ -427,9 +441,12 @@ public class MqttClient extends AbstractSession {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
- publish(publishBuilder, consumer);
+ publish(publishBuilder, consumer, autoFlush);
+ return;
+ }
+ if (autoFlush) {
+ flush();
}
-
}
public MqttClientConfigure getClientConfigure() {
diff --git a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java
index e7a7de8c8a5ccc5a10fbc24c744340c4c3f56fab..1299e20bec77bbe93beaae07219a1b8f36daf82f 100644
--- a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java
+++ b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/MqttClientBootstrap.java
@@ -1,6 +1,7 @@
package org.smartboot.mqtt.client;
import org.smartboot.mqtt.common.enums.MqttQoS;
+import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.message.payload.WillMessage;
import org.smartboot.mqtt.common.util.MqttUtil;
@@ -13,15 +14,15 @@ import java.nio.charset.StandardCharsets;
public class MqttClientBootstrap {
public static void main(String[] args) {
- MqttClient client = new MqttClient("localhost", 1883, MqttUtil.createClientId());
+ MqttClient client = new MqttClient("localhost", 1883, MqttUtil.createClientId(), MqttVersion.MQTT_5);
//心跳
client.getClientConfigure().setKeepAliveInterval(2).setAutomaticReconnect(true);
//遗嘱消息
WillMessage willMessage = new WillMessage();
- willMessage.setWillTopic("willTopic");
- willMessage.setWillRetain(true);
- willMessage.setWillMessage("helloWorld".getBytes(StandardCharsets.UTF_8));
+ willMessage.setTopic("willTopic");
+ willMessage.setRetained(true);
+ willMessage.setPayload("helloWorld".getBytes(StandardCharsets.UTF_8));
willMessage.setWillQos(MqttQoS.AT_MOST_ONCE);
client.willMessage(willMessage);
@@ -33,18 +34,18 @@ public class MqttClientBootstrap {
System.out.println("subscribe message:" + new String(publishMessage.getPayload().getPayload()));
}, (mqttClient, mqttQoS) -> {
//最多分发一次
- client.publish("test", MqttQoS.AT_MOST_ONCE, "aa".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
+ client.publish("test", MqttQoS.AT_MOST_ONCE, "aa".getBytes(StandardCharsets.UTF_8), packetId -> System.out.println("发送结果:" + packetId));
//至少分发一次
- client.publish("test", MqttQoS.AT_LEAST_ONCE, "bb".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
+ client.publish("test", MqttQoS.AT_LEAST_ONCE, "bb".getBytes(StandardCharsets.UTF_8), packetId -> System.out.println("发送结果:" + packetId));
//只分发一次
- client.publish("test", MqttQoS.EXACTLY_ONCE, "cc".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
+ client.publish("test", MqttQoS.EXACTLY_ONCE, "cc".getBytes(StandardCharsets.UTF_8), packetId -> System.out.println("发送结果:" + packetId));
});
client.subscribe("test/#", MqttQoS.AT_MOST_ONCE, (mqttClient, publishMessage) -> {
System.out.println("subscribe test/# message:" + new String(publishMessage.getPayload().getPayload()));
}, (mqttClient, mqttQoS) -> {
//只分发一次
- client.publish("test/dd", MqttQoS.EXACTLY_ONCE, "dd".getBytes(StandardCharsets.UTF_8), false, packetId -> System.out.println("发送结果:" + packetId));
+ client.publish("test/dd", MqttQoS.EXACTLY_ONCE, "dd".getBytes(StandardCharsets.UTF_8), packetId -> System.out.println("发送结果:" + packetId));
});
diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml
index 60c42c94cfdd00c45041281851e1e04f74a17f2e..806073bedb739ac80dea9befe92aa3f7910c5fd1 100644
--- a/smart-mqtt-common/pom.xml
+++ b/smart-mqtt-common/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.19
+ 0.20
../pom.xml
4.0.0
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/DefaultMqttWriter.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/DefaultMqttWriter.java
index 7b1738046b1f0e6d0ab3f782babc2b9524597f7e..f6a2aa1a07683b9c7ee3b225b12feaec1d8ffc6f 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/DefaultMqttWriter.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/DefaultMqttWriter.java
@@ -10,6 +10,7 @@
package org.smartboot.mqtt.common;
+import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.transport.WriteBuffer;
import java.io.IOException;
@@ -20,28 +21,41 @@ import java.io.IOException;
*/
public class DefaultMqttWriter implements MqttWriter {
private final WriteBuffer writeBuffer;
+ private int size;
public DefaultMqttWriter(WriteBuffer writeBuffer) {
this.writeBuffer = writeBuffer;
}
+ @Override
+ public void reset() {
+ size = 0;
+ }
+
@Override
public void writeByte(byte b) {
+ size++;
writeBuffer.writeByte(b);
}
@Override
public void writeShort(short data) throws IOException {
+ ValidateUtils.isTrue(size != 0, "erro: writeShort can't write data, because writer is empty");
+ size += 2;
writeBuffer.writeShort(data);
}
@Override
public void writeInt(int data) throws IOException {
+ ValidateUtils.isTrue(size != 0, "erro: writeShort can't write data, because writer is empty");
+ size += 4;
writeBuffer.writeInt(data);
}
@Override
- public void write(byte[] data) throws IOException {
+ public synchronized void write(byte[] data) throws IOException {
+ ValidateUtils.isTrue(size != 0, "erro: writeShort can't write data, because writer is empty");
+ size += data.length;
writeBuffer.write(data);
}
@@ -49,4 +63,9 @@ public class DefaultMqttWriter implements MqttWriter {
public void flush() {
writeBuffer.flush();
}
+
+ @Override
+ public int writeSize() {
+ return size;
+ }
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
index 992eac6edf13b580630d8f6149024099b7f18ec0..efb4f18992c0d53c7c2e566c80e470338add6859 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
@@ -14,9 +14,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.enums.MqttMessageType;
import org.smartboot.mqtt.common.enums.MqttVersion;
-import org.smartboot.mqtt.common.message.MqttMessage;
+import org.smartboot.mqtt.common.message.MqttFixedHeader;
import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import org.smartboot.mqtt.common.message.MqttPubRelMessage;
+import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.MqttVariableMessage;
import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
@@ -121,9 +122,10 @@ public class InflightQueue {
switch (inflightMessage.getExpectMessageType()) {
case PUBACK:
case PUBREC:
- MqttMessage mqttMessage = inflightMessage.getOriginalMessage();
- mqttMessage.getFixedHeader().setDup(true);
- session.write(mqttMessage);
+ MqttPublishMessage mqttMessage = (MqttPublishMessage) inflightMessage.getOriginalMessage();
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(mqttMessage.getFixedHeader().getMessageType(), true, mqttMessage.getFixedHeader().getQosLevel(), mqttMessage.getFixedHeader().isRetain());
+ MqttPublishMessage dupMessage = new MqttPublishMessage(mqttFixedHeader, mqttMessage.getVariableHeader(), mqttMessage.getPayload().getPayload());
+ session.write(dupMessage);
break;
case PUBCOMP:
ReasonProperties properties = null;
@@ -132,8 +134,7 @@ public class InflightQueue {
}
MqttVariableMessage extends MqttPacketIdVariableHeader> message = inflightMessage.getOriginalMessage();
MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties);
- MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader);
- pubRelMessage.getFixedHeader().setDup(true);
+ MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(MqttFixedHeader.PUB_REL_HEADER_DUP, variableHeader);
session.write(pubRelMessage);
break;
default:
@@ -180,7 +181,7 @@ public class InflightQueue {
properties = new ReasonProperties();
}
MqttPubQosVariableHeader variableHeader = new MqttPubQosVariableHeader(message.getVariableHeader().getPacketId(), properties);
- MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(variableHeader);
+ MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(MqttFixedHeader.PUB_REL_HEADER, variableHeader);
session.write(pubRelMessage, false);
break;
default:
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttWriter.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttWriter.java
index 025d0e307a1e67d8c465ead6bf2c8ec3ace87658..c99e7ac4151cd9c5591af6e4824f445b80bdefbb 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttWriter.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/MqttWriter.java
@@ -17,6 +17,8 @@ import java.io.IOException;
* @version V1.0 , 2022/12/2
*/
public interface MqttWriter {
+ void reset();
+
void writeByte(byte b);
void writeShort(short data) throws IOException;
@@ -26,4 +28,6 @@ public interface MqttWriter {
void write(byte[] data) throws IOException;
void flush();
+
+ int writeSize();
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosMessage.java
deleted file mode 100644
index fde98f7a25114a0b11bb60e4d35ecc62d9102808..0000000000000000000000000000000000000000
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/QosMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
- *
- * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
- *
- * Enterprise users are required to use this project reasonably
- * and legally in accordance with the AGPL-3.0 open source agreement
- * without special permission from the smartboot organization.
- */
-
-package org.smartboot.mqtt.common;
-
-import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
-import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
-
-import java.util.function.Consumer;
-
-/**
- * @author 三刀(zhengjunweimail@163.com)
- * @version V1.0 , 2023/3/27
- */
-public class QosMessage {
- private final MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message;
- private final Consumer> consumer;
- private boolean commit;
-
- public QosMessage(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message, Consumer> consumer) {
- this.message = message;
- this.consumer = consumer;
- }
-
- public MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> getMessage() {
- return message;
- }
-
- public Consumer> getConsumer() {
- return consumer;
- }
-
- public boolean isCommit() {
- return commit;
- }
-
- public void setCommit(boolean commit) {
- this.commit = commit;
- }
-}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventBus.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventBus.java
index d30028275c21473fbbbda88f8244d55daee74524..2080693f315221edad2a1d5a1f9266c08071e9ed 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventBus.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/eventbus/EventBus.java
@@ -21,6 +21,7 @@ public interface EventBus {
void subscribe(EventType type, EventBusSubscriber subscriber);
void subscribe(List> types, EventBusSubscriber subscriber);
+
/**
* 发布消息至总线
*/
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectMessage.java
index 22a64b58147ff77b621c01ab64562c22573f99cf..f67c906d6ca8683e8e53bfb1f746bb3004bd3548 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/message/MqttConnectMessage.java
@@ -95,8 +95,8 @@ public class MqttConnectMessage extends MqttVariableMessage {
if (payloadBuffer.remaining() < remainingLength) {
break;
}
+ int p = payloadBuffer.position();
unit.mqttMessage.decodeVariableHeader(payloadBuffer);
- unit.state = READ_PAYLOAD;
-
- // fall through
- }
-
- case READ_PAYLOAD: {
if (unit.disposableBuffer == null) {
- unit.mqttMessage.decodePlayLoad(buffer);
+ unit.mqttMessage.decodePlayLoad(payloadBuffer);
+ ValidateUtils.isTrue((payloadBuffer.position() - p) == remainingLength, "Payload size is wrong");
} else {
- unit.mqttMessage.decodePlayLoad(unit.disposableBuffer);
- ValidateUtils.isTrue(unit.disposableBuffer.remaining() == 0, "decode error");
+ unit.mqttMessage.decodePlayLoad(payloadBuffer);
+ ValidateUtils.isTrue(payloadBuffer.remaining() == 0, "decode error");
unit.disposableBuffer = null;
}
unit.state = FINISH;
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java
index e88fd486f603656268722eff1f769643bac71688..3496859664913a61485f889d86581248b304e308 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttMessageBuilders.java
@@ -109,7 +109,20 @@ public final class MqttMessageBuilders {
}
public MqttPublishMessage build() {
- MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained);
+ MqttFixedHeader mqttFixedHeader;
+ switch (qos) {
+ case AT_MOST_ONCE:
+ mqttFixedHeader = retained ? MqttFixedHeader.PUB_RETAIN_QOS0_HEADER : MqttFixedHeader.PUB_QOS0_HEADER;
+ break;
+ case AT_LEAST_ONCE:
+ mqttFixedHeader = retained ? MqttFixedHeader.PUB_RETAIN_QOS1_HEADER : MqttFixedHeader.PUB_QOS1_HEADER;
+ break;
+ case EXACTLY_ONCE:
+ mqttFixedHeader = retained ? MqttFixedHeader.PUB_RETAIN_QOS2_HEADER : MqttFixedHeader.PUB_QOS2_HEADER;
+ break;
+ default:
+ throw new IllegalStateException("qos value not supported");
+ }
MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(packetId, topic, publishProperties);
return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, payload);
}
diff --git a/smart-mqtt-data-persistence/pom.xml b/smart-mqtt-data-persistence/pom.xml
deleted file mode 100644
index 1e7bd7ba1f055c724a1d32bc26deadba5e389556..0000000000000000000000000000000000000000
--- a/smart-mqtt-data-persistence/pom.xml
+++ /dev/null
@@ -1,70 +0,0 @@
-
-
- smart-mqtt
- org.smartboot.mqtt
- 0.19
- ../pom.xml
-
- 4.0.0
-
- smart-mqtt-data-persistence
- jar
-
- smart-mqtt-data-persistence
-
-
- UTF-8
-
-
-
-
- org.smartboot.mqtt
- smart-mqtt-broker
-
-
-
- redis.clients
- jedis
- 4.3.1
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.10.1
-
- 1.8
- 1.8
- false
-
-
-
- maven-shade-plugin
- 3.2.4
-
-
- package
-
- shade
-
-
- false
-
-
-
- META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin
-
-
-
-
-
-
-
-
-
-
diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java
deleted file mode 100644
index 7a198db24d97b8f55ee58555c12dde0e9615c983..0000000000000000000000000000000000000000
--- a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.smartboot.mqtt.data.persistence;
-
-
-import org.smartboot.mqtt.broker.plugin.Plugin;
-import org.smartboot.mqtt.data.persistence.config.DataSourcePluginConfig;
-
-
-public class DataPersistPlugin extends Plugin {
- private DataSourcePluginConfig config;
-
- public void setConfig(DataSourcePluginConfig config) {
- this.config = config;
- }
-
- public DataSourcePluginConfig getConfig() {
- return config;
- }
-}
diff --git a/smart-mqtt-data-persistence/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin b/smart-mqtt-data-persistence/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin
deleted file mode 100644
index 02f209379a7c536c70ec312e927860135ad8bdc9..0000000000000000000000000000000000000000
--- a/smart-mqtt-data-persistence/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin
+++ /dev/null
@@ -1 +0,0 @@
-org.smartboot.mqtt.data.persistence.impl.RedisPlugin
\ No newline at end of file