diff --git a/docker-compose.yml b/docker-compose.yml
index dc762a834ec94f4f0c6ef942d774777b44c6ffd0..9367fa4ec3200f20a74cd4acc18a94346d996367 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -53,6 +53,6 @@ services:
options:
max-size: "100m"
max-file: "1"
-# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe
- command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish
+ command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe
+# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish
version: '3.7'
\ No newline at end of file
diff --git a/plugins/pom.xml b/plugins/pom.xml
index 24bee1331b7cd8f316cf93888bbb61e77d367947..bee091216462c50f10872c5e2ef5c1d2c60dc838 100644
--- a/plugins/pom.xml
+++ b/plugins/pom.xml
@@ -16,7 +16,7 @@
org.smartboot.mqtt
smart-mqtt
- 0.20
+ 0.21
../pom.xml
pom
diff --git a/plugins/redis-bridge-plugin/pom.xml b/plugins/redis-bridge-plugin/pom.xml
index a89708bce7e8fa9b05a3d968b2af0de5fbbc8ed4..f4e16694b6ad5b6f2d9a1e9c84def2e413a8d367 100644
--- a/plugins/redis-bridge-plugin/pom.xml
+++ b/plugins/redis-bridge-plugin/pom.xml
@@ -3,7 +3,7 @@
plugins
org.smartboot.mqtt
- 0.20
+ 0.21
../pom.xml
4.0.0
diff --git a/pom.xml b/pom.xml
index cacb49b895c2225db9da774b43fe88c3d99471c4..6e52628a2764665e6846beeb173b3cd5bfafc3f5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,13 +4,13 @@
org.smartboot.mqtt
smart-mqtt
smart-mqtt
- 0.20
+ 0.21
4.0.0
mqtt broker
- 0.20
- 1.5.27
+ 0.21
+ 1.5.29
1.1.22
2.6
4.3
@@ -66,7 +66,7 @@
com.alibaba.fastjson2
fastjson2
- 2.0.20.graal
+ 2.0.21.graal
junit
diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml
index e50e103d6e342f79e62b2f98a4b79f75a771de80..aacc07888dd035fbdb3ae17357429d74f670f55f 100644
--- a/smart-mqtt-broker/pom.xml
+++ b/smart-mqtt-broker/pom.xml
@@ -5,7 +5,7 @@
org.smartboot.mqtt
smart-mqtt
- 0.20
+ 0.21
../pom.xml
4.0.0
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
index caa7f048ecd297e3113f5d880a02f1c965af34c9..408aa30e2b43cdd674a07353d5dd388bbab7f51e 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java
@@ -11,8 +11,12 @@
package org.smartboot.mqtt.broker;
import org.smartboot.mqtt.common.ToString;
+import org.smartboot.mqtt.common.message.MqttMessage;
+import org.smartboot.socket.extension.plugins.Plugin;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
/**
@@ -38,7 +42,7 @@ public class BrokerConfigure extends ToString {
/**
* 当前smart-mqtt
*/
- public static final String VERSION = "v0.20";
+ public static final String VERSION = "v0.21";
static final Map SystemEnvironments = new HashMap<>();
@@ -100,6 +104,8 @@ public class BrokerConfigure extends ToString {
*/
private int maxInflight = 8;
+ private final List> plugins = new LinkedList<>();
+
public int getPort() {
return port;
}
@@ -188,6 +194,15 @@ public class BrokerConfigure extends ToString {
this.name = name;
}
+ public BrokerConfigure addPlugin(Plugin plugin) {
+ plugins.add(plugin);
+ return this;
+ }
+
+ public List> getPlugins() {
+ return plugins;
+ }
+
@Override
public String toString() {
return "BrokerConfigure{" +
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java
index 28c8cc490e1d6db3fe2e58791835cdda8336117a..1e68393344379b50b4def018e9ae3b9f2992a931 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java
@@ -11,13 +11,16 @@
package org.smartboot.mqtt.broker;
import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus;
+import org.smartboot.mqtt.broker.processor.MqttProcessor;
import org.smartboot.mqtt.broker.provider.Providers;
-import org.smartboot.mqtt.common.enums.MqttMetricEnum;
+import org.smartboot.mqtt.broker.topic.TopicPublishTree;
+import org.smartboot.mqtt.broker.topic.TopicSubscribeTree;
import org.smartboot.mqtt.common.eventbus.EventBus;
-import org.smartboot.mqtt.common.to.MetricItemTO;
+import org.smartboot.mqtt.common.message.MqttMessage;
import java.io.IOException;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
/**
@@ -84,11 +87,9 @@ public interface BrokerContext {
*/
T parseConfig(String path, Class clazz);
- MqttBrokerMessageProcessor getMessageProcessor();
+ Map, MqttProcessor>> getMessageProcessors();
- /**
- * 运行指标
- */
- MetricItemTO metric(MqttMetricEnum metricEnum);
+ TopicPublishTree getPublishTopicTree();
+ TopicSubscribeTree getTopicSubscribeTree();
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
index 8b61bb1632f865c8cca1c151e550eba3bb6f9854..bdf8b6fade4e74403fa5accc3c6c2f847d6873d2 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java
@@ -13,6 +13,7 @@ package org.smartboot.mqtt.broker;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONPath;
import com.alibaba.fastjson2.JSONReader;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,33 +24,49 @@ import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus;
import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBusSubscriber;
import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.RetainPersistenceConsumer;
import org.smartboot.mqtt.broker.plugin.Plugin;
+import org.smartboot.mqtt.broker.processor.ConnectProcessor;
+import org.smartboot.mqtt.broker.processor.DisConnectProcessor;
+import org.smartboot.mqtt.broker.processor.MqttAckProcessor;
+import org.smartboot.mqtt.broker.processor.MqttProcessor;
+import org.smartboot.mqtt.broker.processor.PingReqProcessor;
+import org.smartboot.mqtt.broker.processor.PubRelProcessor;
+import org.smartboot.mqtt.broker.processor.PublishProcessor;
+import org.smartboot.mqtt.broker.processor.SubscribeProcessor;
+import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor;
import org.smartboot.mqtt.broker.provider.Providers;
import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage;
+import org.smartboot.mqtt.broker.topic.TopicPublishTree;
+import org.smartboot.mqtt.broker.topic.TopicSubscribeTree;
import org.smartboot.mqtt.common.AsyncTask;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.QosRetryPlugin;
-import org.smartboot.mqtt.common.enums.MqttMetricEnum;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventBus;
import org.smartboot.mqtt.common.eventbus.EventBusImpl;
import org.smartboot.mqtt.common.eventbus.EventBusSubscriber;
import org.smartboot.mqtt.common.eventbus.EventType;
-import org.smartboot.mqtt.common.message.MqttConnAckMessage;
import org.smartboot.mqtt.common.message.MqttConnectMessage;
+import org.smartboot.mqtt.common.message.MqttDisconnectMessage;
import org.smartboot.mqtt.common.message.MqttMessage;
+import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
+import org.smartboot.mqtt.common.message.MqttPingReqMessage;
+import org.smartboot.mqtt.common.message.MqttPubAckMessage;
+import org.smartboot.mqtt.common.message.MqttPubCompMessage;
+import org.smartboot.mqtt.common.message.MqttPubRecMessage;
+import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
+import org.smartboot.mqtt.common.message.MqttSubscribeMessage;
+import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage;
+import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import org.smartboot.mqtt.common.protocol.MqttProtocol;
-import org.smartboot.mqtt.common.to.MetricItemTO;
import org.smartboot.mqtt.common.util.MqttMessageBuilders;
import org.smartboot.mqtt.common.util.MqttUtil;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.buffer.BufferPagePool;
import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider;
-import org.smartboot.socket.extension.plugins.AbstractPlugin;
import org.smartboot.socket.transport.AioQuickServer;
-import org.smartboot.socket.transport.AioSession;
import org.yaml.snakeyaml.Yaml;
import java.io.IOException;
@@ -69,6 +86,7 @@ import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
@@ -93,6 +111,9 @@ public class BrokerContextImpl implements BrokerContext {
*/
private final ConcurrentMap topicMap = new ConcurrentHashMap<>();
private BrokerConfigure brokerConfigure = new BrokerConfigure();
+ private final TopicPublishTree topicPublishTree = new TopicPublishTree();
+
+ private final TopicSubscribeTree subscribeTopicTree = new TopicSubscribeTree();
/**
* Keep-Alive监听线程
*/
@@ -118,12 +139,25 @@ public class BrokerContextImpl implements BrokerContext {
private String configJson;
private final static BrokerTopic SHUTDOWN_TOPIC = new BrokerTopic("");
- /**
- * 统计指标
- */
- private final Map metricMap = new HashMap<>();
private AsynchronousChannelGroup asynchronousChannelGroup;
+ private final Map, MqttProcessor>> processors;
+
+ {
+ Map, MqttProcessor>> mqttProcessors = new HashMap<>();
+ mqttProcessors.put(MqttPingReqMessage.class, new PingReqProcessor());
+ mqttProcessors.put(MqttConnectMessage.class, new ConnectProcessor());
+ mqttProcessors.put(MqttPublishMessage.class, new PublishProcessor());
+ mqttProcessors.put(MqttSubscribeMessage.class, new SubscribeProcessor());
+ mqttProcessors.put(MqttUnsubscribeMessage.class, new UnSubscribeProcessor());
+ mqttProcessors.put(MqttPubAckMessage.class, new MqttAckProcessor<>());
+ mqttProcessors.put(MqttPubRelMessage.class, new PubRelProcessor());
+ mqttProcessors.put(MqttPubRecMessage.class, new MqttAckProcessor<>());
+ mqttProcessors.put(MqttPubCompMessage.class, new MqttAckProcessor<>());
+ mqttProcessors.put(MqttDisconnectMessage.class, new DisConnectProcessor());
+ processors = MapUtils.unmodifiableMap(mqttProcessors);
+ }
+
@Override
public void init() throws IOException {
@@ -135,8 +169,6 @@ public class BrokerContextImpl implements BrokerContext {
initPushThread();
- initMetric();
-
loadAndInstallPlugins();
@@ -150,7 +182,8 @@ public class BrokerContextImpl implements BrokerContext {
}
});
pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true);
- processor.addPlugin(new QosRetryPlugin());
+ brokerConfigure.addPlugin(new QosRetryPlugin());
+ brokerConfigure.getPlugins().forEach(processor::addPlugin);
server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor);
server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum()));
server.start(asynchronousChannelGroup);
@@ -182,69 +215,6 @@ public class BrokerContextImpl implements BrokerContext {
configJson = null;
}
- private void initMetric() {
- for (MqttMetricEnum metricEnum : MqttMetricEnum.values()) {
- metricMap.put(metricEnum, new MetricItemTO(metricEnum));
- }
-
- processor.addPlugin(new AbstractPlugin() {
- @Override
- public void afterRead(AioSession session, int readSize) {
- if (readSize > 0) {
- metricMap.get(MqttMetricEnum.BYTES_RECEIVED).getMetric().add(readSize);
- }
- }
-
- @Override
- public void afterWrite(AioSession session, int writeSize) {
- if (writeSize > 0) {
- metricMap.get(MqttMetricEnum.BYTES_SENT).getMetric().add(writeSize);
- }
- }
- });
- eventBus.subscribe(ServerEventType.CONNECT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_CONNECT).getMetric().increment());
- eventBus.subscribe(ServerEventType.DISCONNECT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_DISCONNECT).getMetric().increment());
- eventBus.subscribe(ServerEventType.SUBSCRIBE_ACCEPT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_SUBSCRIBE).getMetric().increment());
- eventBus.subscribe(ServerEventType.UNSUBSCRIBE_ACCEPT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_UNSUBSCRIBE).getMetric().increment());
- eventBus.subscribe(EventType.RECEIVE_MESSAGE, (eventType, object) -> {
- metricMap.get(MqttMetricEnum.PACKETS_RECEIVED).getMetric().increment();
- if (object.getObject() instanceof MqttConnectMessage) {
- metricMap.get(MqttMetricEnum.PACKETS_CONNECT_RECEIVED).getMetric().increment();
- }
- });
- eventBus.subscribe(EventType.WRITE_MESSAGE, (eventType, object) -> {
- metricMap.get(MqttMetricEnum.PACKETS_SENT).getMetric().increment();
- if (object.getObject() instanceof MqttConnAckMessage) {
- metricMap.get(MqttMetricEnum.PACKETS_CONNACK_SENT).getMetric().increment();
- } else if (object.getObject() instanceof MqttPublishMessage) {
- switch (object.getObject().getFixedHeader().getQosLevel()) {
- case AT_MOST_ONCE:
- metricMap.get(MqttMetricEnum.MESSAGE_QOS0_SENT).getMetric().increment();
- break;
- case AT_LEAST_ONCE:
- metricMap.get(MqttMetricEnum.MESSAGE_QOS1_SENT).getMetric().increment();
- break;
- case EXACTLY_ONCE:
- metricMap.get(MqttMetricEnum.MESSAGE_QOS2_SENT).getMetric().increment();
- break;
- }
- }
- });
- messageBusSubscriber.consumer((brokerContext1, publishMessage) -> {
- switch (publishMessage.getFixedHeader().getQosLevel()) {
- case AT_MOST_ONCE:
- metricMap.get(MqttMetricEnum.MESSAGE_QOS0_RECEIVED).getMetric().increment();
- break;
- case AT_LEAST_ONCE:
- metricMap.get(MqttMetricEnum.MESSAGE_QOS1_RECEIVED).getMetric().increment();
- break;
- case EXACTLY_ONCE:
- metricMap.get(MqttMetricEnum.MESSAGE_QOS2_RECEIVED).getMetric().increment();
- break;
- }
-
- });
- }
private final TopicSubscriber BREAK = new TopicSubscriber(null, null, null, 0, 0);
@@ -344,6 +314,8 @@ public class BrokerContextImpl implements BrokerContext {
notifyPush(brokerTopic);
});
+ eventBus.subscribe(ServerEventType.NOTIFY_TOPIC_PUSH, (eventType, object) -> notifyPush(object));
+
//一个新的订阅建立时,对每个匹配的主题名,如果存在最近保留的消息,它必须被发送给这个订阅者
eventBus.subscribe(ServerEventType.SUBSCRIBE_TOPIC, new EventBusSubscriber() {
@Override
@@ -357,12 +329,6 @@ public class BrokerContextImpl implements BrokerContext {
BrokerTopic topic = subscriber.getTopic();
topic.getQueue().offer(subscriber);
notifyPush(topic);
-//
-// int preVersion = subscriber.getTopic().getVersion().get();
-// subscriber.batchPublish(BrokerContextImpl.this);
-// if (preVersion != subscriber.getTopic().getVersion().get()) {
-// notifyPush(subscriber.getTopic());
-// }
return;
}
//retain采用严格顺序publish模式
@@ -382,7 +348,8 @@ public class BrokerContextImpl implements BrokerContext {
}
InflightQueue inflightQueue = session.getInflightQueue();
// retain消息逐个推送
- inflightQueue.offer(publishBuilder, (mqtt) -> {
+ CompletableFuture> future = inflightQueue.offer(publishBuilder);
+ future.whenComplete((mqttPacketIdentifierMessage, throwable) -> {
LOGGER.info("publish retain to client:{} success ", session.getClientId());
subscriber.setRetainConsumerOffset(offset + 1);
retainPushThreadPool.execute(task);
@@ -392,10 +359,13 @@ public class BrokerContextImpl implements BrokerContext {
});
}
});
- eventBus.subscribe(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, (eventType, subscriber) -> {
- LOGGER.info("刷新订阅关系, {} 订阅了topic: {}", subscriber.getTopicFilterToken().getTopicFilter(), subscriber.getTopic().getTopic());
- subscriber.getTopic().getQueue().offer(subscriber);
- });
+
+ eventBus.subscribe(ServerEventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object.getTopicToken(), (session, topicFilterSubscriber) -> {
+ if (!providers.getSubscribeProvider().subscribeTopic(object.getTopic(), session)) {
+ return;
+ }
+ session.subscribeSuccess(topicFilterSubscriber.getMqttQoS(), topicFilterSubscriber.getTopicFilterToken(), object);
+ }));
}
private void notifyPush(BrokerTopic topic) {
@@ -478,9 +448,8 @@ public class BrokerContextImpl implements BrokerContext {
public BrokerTopic getOrCreateTopic(String topic) {
return topicMap.computeIfAbsent(topic, topicName -> {
ValidateUtils.isTrue(!MqttUtil.containsTopicWildcards(topicName), "invalid topicName: " + topicName);
- BrokerTopic newTopic = new BrokerTopic(topicName);
+ BrokerTopic newTopic = topicPublishTree.addTopic(topic);
eventBus.publish(ServerEventType.TOPIC_CREATE, newTopic);
- metric(MqttMetricEnum.TOPIC_COUNT).getMetric().increment();
return newTopic;
});
}
@@ -548,6 +517,16 @@ public class BrokerContextImpl implements BrokerContext {
}
}
+ @Override
+ public TopicPublishTree getPublishTopicTree() {
+ return topicPublishTree;
+ }
+
+ @Override
+ public TopicSubscribeTree getTopicSubscribeTree() {
+ return subscribeTopicTree;
+ }
+
public void loadYamlConfig() throws IOException {
String brokerConfig = System.getProperty(BrokerConfigure.SystemProperty.BrokerConfig);
InputStream inputStream = null;
@@ -567,16 +546,13 @@ public class BrokerContextImpl implements BrokerContext {
}
}
- @Override
- public MqttBrokerMessageProcessor getMessageProcessor() {
- return processor;
- }
@Override
- public MetricItemTO metric(MqttMetricEnum metricEnum) {
- return metricMap.get(metricEnum);
+ public Map, MqttProcessor>> getMessageProcessors() {
+ return processors;
}
+
@Override
public void destroy() {
LOGGER.info("destroy broker...");
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
index d781f8b17304b6376ca2c5688daf55f04937d2f6..8bfae4a5e96dc115d504cab97b713ddb57ca1fc9 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java
@@ -12,40 +12,19 @@ package org.smartboot.mqtt.broker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.smartboot.mqtt.broker.processor.ConnectProcessor;
-import org.smartboot.mqtt.broker.processor.DisConnectProcessor;
-import org.smartboot.mqtt.broker.processor.MqttAckProcessor;
import org.smartboot.mqtt.broker.processor.MqttProcessor;
-import org.smartboot.mqtt.broker.processor.PingReqProcessor;
-import org.smartboot.mqtt.broker.processor.PubRelProcessor;
-import org.smartboot.mqtt.broker.processor.PublishProcessor;
-import org.smartboot.mqtt.broker.processor.SubscribeProcessor;
-import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor;
import org.smartboot.mqtt.common.DefaultMqttWriter;
-import org.smartboot.mqtt.common.enums.MqttMetricEnum;
import org.smartboot.mqtt.common.eventbus.EventObject;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.exception.MqttException;
-import org.smartboot.mqtt.common.message.MqttConnectMessage;
-import org.smartboot.mqtt.common.message.MqttDisconnectMessage;
import org.smartboot.mqtt.common.message.MqttMessage;
-import org.smartboot.mqtt.common.message.MqttPingReqMessage;
-import org.smartboot.mqtt.common.message.MqttPubAckMessage;
-import org.smartboot.mqtt.common.message.MqttPubCompMessage;
-import org.smartboot.mqtt.common.message.MqttPubRecMessage;
-import org.smartboot.mqtt.common.message.MqttPubRelMessage;
-import org.smartboot.mqtt.common.message.MqttPublishMessage;
-import org.smartboot.mqtt.common.message.MqttSubscribeMessage;
-import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage;
+import org.smartboot.mqtt.common.util.MqttAttachKey;
import org.smartboot.socket.StateMachineEnum;
import org.smartboot.socket.extension.processor.AbstractMessageProcessor;
import org.smartboot.socket.transport.AioSession;
+import org.smartboot.socket.util.AttachKey;
import org.smartboot.socket.util.Attachment;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* @author 三刀
* @version V1.0 , 2018/4/24
@@ -56,25 +35,9 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor onlineSessions = new ConcurrentHashMap<>();
- private final Map, MqttProcessor> processorMap = new HashMap<>();
- {
- processorMap.put(MqttPingReqMessage.class, new PingReqProcessor());
- processorMap.put(MqttConnectMessage.class, new ConnectProcessor());
- processorMap.put(MqttPublishMessage.class, new PublishProcessor());
- processorMap.put(MqttSubscribeMessage.class, new SubscribeProcessor());
- processorMap.put(MqttUnsubscribeMessage.class, new UnSubscribeProcessor());
- processorMap.put(MqttPubAckMessage.class, new MqttAckProcessor<>());
- processorMap.put(MqttPubRelMessage.class, new PubRelProcessor());
- processorMap.put(MqttPubRecMessage.class, new MqttAckProcessor<>());
- processorMap.put(MqttPubCompMessage.class, new MqttAckProcessor<>());
- processorMap.put(MqttDisconnectMessage.class, new DisConnectProcessor());
-// addPlugin(new RateLimiterPlugin<>(1024 * 512, 1024 * 512));
- }
+ private final static AttachKey SESSION_KEY = AttachKey.valueOf(MqttAttachKey.MQTT_SESSION);
+
public MqttBrokerMessageProcessor(BrokerContext mqttContext) {
this.mqttContext = mqttContext;
@@ -82,9 +45,10 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor getOnlineSessions() {
- return onlineSessions;
- }
-
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
index 713295a415134565879b025863aa275f18a1484e..02a990fcbdeeb3226bbf681b34cd86a7a5a22ae1 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java
@@ -18,11 +18,9 @@ import org.smartboot.mqtt.common.AbstractSession;
import org.smartboot.mqtt.common.MqttWriter;
import org.smartboot.mqtt.common.TopicToken;
import org.smartboot.mqtt.common.enums.MqttQoS;
-import org.smartboot.mqtt.common.eventbus.EventBusSubscriber;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties;
-import org.smartboot.mqtt.common.util.MqttUtil;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.transport.AioSession;
@@ -134,71 +132,49 @@ public class MqttSession extends AbstractSession {
public MqttQoS subscribe(String topicFilter, MqttQoS mqttQoS) {
if (mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topicFilter, this)) {
- subscribe0(topicFilter, mqttQoS, true);
+ subscribe0(topicFilter, mqttQoS);
return mqttQoS;
} else {
return MqttQoS.FAILURE;
}
}
- private void subscribe0(String topicFilter, MqttQoS mqttQoS, boolean newSubscribe) {
+ private void subscribe0(String topicFilter, MqttQoS mqttQoS) {
+ TopicFilterSubscriber subscriber = subscribers.get(topicFilter);
+ if (subscriber != null) {
+ subscriber.setMqttQoS(mqttQoS);
+ subscriber.getTopicSubscribers().values().forEach(sub -> sub.setMqttQoS(mqttQoS));
+ return;
+ }
TopicToken topicToken = new TopicToken(topicFilter);
- //精准匹配
if (!topicToken.isWildcards()) {
- BrokerTopic topic = mqttContext.getOrCreateTopic(topicToken.getTopicFilter());//可能会先触发TopicFilterSubscriber.subscribe
- TopicSubscriber subscription = subscribeSuccess(mqttQoS, topicToken, topic);
- if (newSubscribe) {
- mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription);
- } else {
- mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, subscription);
- }
- return;
+ mqttContext.getOrCreateTopic(topicFilter);
}
-
- //通配符匹配存量Topic
- for (BrokerTopic topic : mqttContext.getTopics()) {
- if (MqttUtil.match(topic.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topic.getTopic(), this)) {
- TopicSubscriber subscription = subscribeSuccess(mqttQoS, topicToken, topic);
- if (newSubscribe) {
- mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription);
- } else {
- mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, subscription);
+ subscriber = new TopicFilterSubscriber(topicToken, mqttQoS);
+ TopicFilterSubscriber preSubscriber = subscribers.put(topicFilter, subscriber);
+ ValidateUtils.isTrue(preSubscriber == null, "duplicate topic filter");
+ mqttContext.getTopicSubscribeTree().subscribeTopic(this, subscriber);
+ mqttContext.getPublishTopicTree().match(topicToken, topic -> subscribeSuccess(mqttQoS, topicToken, topic));
+ }
+
+ public void subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) {
+ TopicSubscriber topicSubscriber = topic.getConsumeOffsets().get(this);
+ if (topicSubscriber != null) {
+ //此前的订阅关系
+ TopicToken preToken = topicSubscriber.getTopicFilterToken();
+ if (preToken.isWildcards()) {
+ if (!topicToken.isWildcards() || topicToken.getTopicFilter().length() > preToken.getTopicFilter().length()) {
+ //解除旧的订阅关系
+ TopicSubscriber preSubscription = subscribers.get(preToken.getTopicFilter()).getTopicSubscribers().remove(topic);
+ preSubscription.setMqttQoS(mqttQoS);
+ preSubscription.setTopicFilterToken(topicToken);
+ //绑定新的订阅关系
+ subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, preSubscription);
+ mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, preSubscription);
}
}
+ return;
}
-
- //通配符匹配增量Topic
- if (!subscribers.containsKey(topicFilter)) {
- subscribers.put(topicFilter, new TopicFilterSubscriber(topicToken, mqttQoS));
- }
- if (newSubscribe) {
- mqttContext.getEventBus().subscribe(ServerEventType.TOPIC_CREATE, new EventBusSubscriber() {
- @Override
- public boolean enable() {
- boolean enable = !disconnect && subscribers.containsKey(topicFilter);
- if (!enable) {
- LOGGER.info("current event is disable,quit topic:{} monitor", topicFilter);
- }
- return enable;
- }
-
- @Override
- public void subscribe(EventType eventType, BrokerTopic object) {
- if (MqttUtil.match(object.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(object.getTopic(), MqttSession.this)) {
- TopicSubscriber subscription = MqttSession.this.subscribeSuccess(mqttQoS, topicToken, object);
- mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription);
- }
- }
- });
- }
- }
-
- /**
- * retain消息消费点位记录
- */
- private final Map retainOffsetCache = new HashMap<>();
-
- private TopicSubscriber subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) {
long latestOffset = mqttContext.getProviders().getPersistenceProvider().getLatestOffset(topic.getTopic());
// retain消费点位优先以缓存为准
Long retainOffset = retainOffsetCache.get(topic);
@@ -207,43 +183,23 @@ public class MqttSession extends AbstractSession {
retainOffset = oldestRetainOffset;
}
//以当前消息队列的最新点位为起始点位
- TopicSubscriber subscription = new TopicSubscriber(topic, this, mqttQoS, latestOffset + 1, retainOffset);
+ TopicSubscriber subscription = new TopicSubscriber(topic, MqttSession.this, mqttQoS, latestOffset + 1, retainOffset);
subscription.setTopicFilterToken(topicToken);
- // 如果服务端收到一个 SUBSCRIBE 报文,
- //报文的主题过滤器与一个现存订阅的主题过滤器相同,
- // 那么必须使用新的订阅彻底替换现存的订阅。
- // 新订阅的主题过滤器和之前订阅的相同,但是它的最大 QoS 值可以不同。
- ValidateUtils.isTrue(!disconnect, "session has closed,can not subscribe topic");
-
- subscribers.values().forEach(topicFilterSubscriber -> {
- TopicSubscriber oldOffset = topicFilterSubscriber.getTopicSubscribers().remove(topic.getTopic());
- if (oldOffset != null) {
- TopicSubscriber consumerOffset = oldOffset.getTopic().getConsumeOffsets().remove(this);
- consumerOffset.disable();
- LOGGER.info("remove topic:{} {},", topic, oldOffset == consumerOffset ? "success" : "fail");
- }
- });
-
- TopicFilterSubscriber topicFilterSubscriber = subscribers.get(subscription.getTopicFilterToken().getTopicFilter());
- if (topicFilterSubscriber == null) {
- topicFilterSubscriber = new TopicFilterSubscriber(subscription.getTopicFilterToken(), subscription.getMqttQoS(), subscription);
- subscribers.put(subscription.getTopicFilterToken().getTopicFilter(), topicFilterSubscriber);
- } else {
- topicFilterSubscriber.getTopicSubscribers().put(subscription.getTopic().getTopic(), subscription);
- }
- TopicSubscriber preTopicSubscriber = subscription.getTopic().getConsumeOffsets().put(this, subscription);
- if (preTopicSubscriber != null) {
- LOGGER.error("invalid state...");
- } else {
- LOGGER.debug("new subscribe topic:{} success by topicFilter:{}", subscription.getTopic().getTopic(), subscription.getTopicFilterToken().getTopicFilter());
- }
-
- return subscription;
+ topic.getConsumeOffsets().put(MqttSession.this, subscription);
+ subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, subscription);
+ mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription);
}
+ /**
+ * retain消息消费点位记录
+ */
+ private final Map retainOffsetCache = new HashMap<>();
+
public void resubscribe() {
- subscribers.values().forEach(subscriber -> subscribe0(subscriber.getTopicFilterToken().getTopicFilter(), subscriber.getMqttQoS(), false));
+ subscribers.values().stream().filter(subscriber -> subscriber.getTopicFilterToken().isWildcards()).forEach(subscriber -> {
+ mqttContext.getPublishTopicTree().match(subscriber.getTopicFilterToken(), topic -> subscribeSuccess(subscriber.getMqttQoS(), subscriber.getTopicFilterToken(), topic));
+ });
}
public void unsubscribe(String topicFilter) {
@@ -261,6 +217,7 @@ public class MqttSession extends AbstractSession {
LOGGER.error("remove subscriber:{} error!", removeSubscriber);
}
});
+ mqttContext.getTopicSubscribeTree().unsubscribe(this, filterSubscriber);
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java
index 75f1691f99be09998d488d5d01a2afbcad315a88..7047f2ef7e7f347f27172dff05d6ef9ed41dc540 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java
@@ -22,25 +22,19 @@ import java.util.Map;
* @author 三刀(zhengjunweimail@163.com)
* @version V1.0 , 2022/7/13
*/
-class TopicFilterSubscriber {
+public class TopicFilterSubscriber {
private final TopicToken topicFilterToken;
- private final MqttQoS mqttQoS;
+ private MqttQoS mqttQoS;
/**
* 客户端订阅所匹配的Topic。通配符订阅时可能有多个
*/
- private final Map topicSubscribers;
+ private final Map topicSubscribers = new HashMap<>();
- public TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) {
+ TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) {
this.topicFilterToken = topicFilterToken;
this.mqttQoS = mqttQoS;
- this.topicSubscribers = new HashMap<>();
- }
-
- public TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS, TopicSubscriber topicSubscriber) {
- this(topicFilterToken, mqttQoS);
- topicSubscribers.put(topicSubscriber.getTopic().getTopic(), topicSubscriber);
}
public TopicToken getTopicFilterToken() {
@@ -51,7 +45,11 @@ class TopicFilterSubscriber {
return mqttQoS;
}
- public Map getTopicSubscribers() {
+ public void setMqttQoS(MqttQoS mqttQoS) {
+ this.mqttQoS = mqttQoS;
+ }
+
+ public Map getTopicSubscribers() {
return topicSubscribers;
}
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
index 5c1ed6c83645f2ecfe56ada25c78fb182a07bd90..09378fd580f6cd43bb63f8716df9b1eb2d20d362 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java
@@ -12,17 +12,20 @@ package org.smartboot.mqtt.broker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.smartboot.mqtt.broker.eventbus.ServerEventType;
import org.smartboot.mqtt.broker.provider.PersistenceProvider;
import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage;
-import org.smartboot.mqtt.common.InflightMessage;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.TopicToken;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventType;
+import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
+import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import org.smartboot.mqtt.common.util.MqttMessageBuilders;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
/**
@@ -41,7 +44,7 @@ public class TopicSubscriber {
/**
* 服务端向客户端发送应用消息所允许的最大 QoS 等级
*/
- private final MqttQoS mqttQoS;
+ private MqttQoS mqttQoS;
/**
* 期望消费的点位
@@ -76,25 +79,27 @@ public class TopicSubscriber {
return;
}
semaphore.release();
- publish0(brokerContext, 0);
+ int i = 16;
+ while (publishAvailable(brokerContext)) {
+ if (i-- == 0) {
+ if (semaphore.tryAcquire()) {
+ topic.getQueue().offer(this);
+ topic.getVersion().incrementAndGet();
+ }
+ break;
+ }
+ }
mqttSession.flush();
}
- private void publish0(BrokerContext brokerContext, final int depth) {
+ private boolean publishAvailable(BrokerContext brokerContext) {
PersistenceProvider persistenceProvider = brokerContext.getProviders().getPersistenceProvider();
PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), nextConsumerOffset);
if (persistenceMessage == null) {
if (semaphore.tryAcquire()) {
topic.getQueue().offer(this);
}
- return;
- }
- if (depth > 16) {
- if (semaphore.tryAcquire()) {
- topic.getQueue().offer(this);
- topic.getVersion().incrementAndGet();
- }
- return;
+ return false;
}
MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(persistenceMessage.getPayload()).qos(mqttQoS).topicName(persistenceMessage.getTopic());
@@ -109,38 +114,30 @@ public class TopicSubscriber {
//Qos0直接发送
if (mqttQoS == MqttQoS.AT_MOST_ONCE) {
mqttSession.write(publishBuilder.build(), false);
- publish0(brokerContext, depth + 1);
- return;
- }
- InflightMessage suc;
- if (depth == 0) {
- suc = inflightQueue.offer(publishBuilder, (mqtt) -> {
- //最早发送的消息若收到响应,则更新点位
- commitNextConsumerOffset(offset + 1);
- if (persistenceMessage.isRetained()) {
- setRetainConsumerOffset(getRetainConsumerOffset() + 1);
- }
- commitRetainConsumerTimestamp(persistenceMessage.getCreateTime());
- publish0(brokerContext, 1);
- }, () -> publish0(brokerContext, 0));
- } else {
- suc = inflightQueue.offer(publishBuilder, (mqtt) -> {
- //最早发送的消息若收到响应,则更新点位
- commitNextConsumerOffset(offset + 1);
- if (persistenceMessage.isRetained()) {
- setRetainConsumerOffset(getRetainConsumerOffset() + 1);
- }
- commitRetainConsumerTimestamp(persistenceMessage.getCreateTime());
- publish0(brokerContext, 1);
- });
+ return true;
}
- // 飞行队列已满
- if (suc != null) {
- //递归处理下一个消息
- publish0(brokerContext, depth + 1);
+ CompletableFuture> future = inflightQueue.offer(publishBuilder, () -> {
+ if (semaphore.tryAcquire()) {
+ topic.getQueue().offer(this);
+ topic.getVersion().incrementAndGet();
+ }
+ brokerContext.getEventBus().publish(ServerEventType.NOTIFY_TOPIC_PUSH, topic);
+ });
+ if (future == null) {
+ return false;
}
+ future.whenComplete((mqttPacketIdentifierMessage, throwable) -> {
+ //最早发送的消息若收到响应,则更新点位
+ commitNextConsumerOffset(offset + 1);
+ if (persistenceMessage.isRetained()) {
+ setRetainConsumerOffset(getRetainConsumerOffset() + 1);
+ }
+ commitRetainConsumerTimestamp(persistenceMessage.getCreateTime());
+ publishAvailable(brokerContext);
+ });
+ return true;
}
public BrokerTopic getTopic() {
@@ -191,4 +188,8 @@ public class TopicSubscriber {
public void disable() {
this.enable = false;
}
+
+ public void setMqttQoS(MqttQoS mqttQoS) {
+ this.mqttQoS = mqttQoS;
+ }
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java
index d4bf3ef3995a72429dca8b3acd6e44796056950b..4fcc38d3f25d4282dca10c91fd0dfa27021ac959 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java
@@ -86,6 +86,8 @@ public class ServerEventType extends EventType {
*/
public static final ServerEventType> CONNACK = new ServerEventType<>("connect");
+ public static final ServerEventType NOTIFY_TOPIC_PUSH = new ServerEventType<>("notify_topic_push");
+
protected ServerEventType(String name) {
super(name);
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java
index a58408d7b3d1f1d3ef5cde265c50beb1450a317b..447b3bc9239f1cdaf6659bfb070f0bdd511a0e70 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java
@@ -20,19 +20,12 @@ import org.smartboot.http.restful.annotation.RequestMapping;
import org.smartboot.mqtt.broker.BrokerConfigure;
import org.smartboot.mqtt.broker.BrokerContext;
import org.smartboot.mqtt.broker.BrokerRuntime;
-import org.smartboot.mqtt.broker.MqttSession;
import org.smartboot.mqtt.broker.openapi.OpenApi;
import org.smartboot.mqtt.broker.openapi.enums.BrokerStatueEnum;
import org.smartboot.mqtt.broker.openapi.to.BrokerNodeTO;
-import org.smartboot.mqtt.common.enums.MqttMetricEnum;
-import org.smartboot.mqtt.common.to.MetricItemTO;
-import org.smartboot.mqtt.common.to.MetricTO;
import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
import java.util.List;
/**
@@ -52,30 +45,8 @@ public class DashBoardController {
}
@RequestMapping(OpenApi.DASHBOARD_OVERVIEW)
- public RestResult overview() {
- MetricTO metricTO = new MetricTO();
- Collection sessions = brokerContext.getSessions();
- Date date = new Date();
- //在线客户端数量
- MetricItemTO online = brokerContext.metric(MqttMetricEnum.CLIENT_ONLINE);
- online.setTime(date);
- metricTO.getMetric().put(MqttMetricEnum.CLIENT_ONLINE.getCode(), online);
-
- //主题数
- MetricItemTO topicCount = brokerContext.metric(MqttMetricEnum.TOPIC_COUNT);
- topicCount.setTime(date);
- metricTO.getMetric().put(MqttMetricEnum.TOPIC_COUNT.getCode(), topicCount);
-
- int subCount = 0;
- for (MqttSession session : sessions) {
- subCount += session.getSubscribers().size();
- }
- MetricItemTO subscribeTopicCount = new MetricItemTO();
- subscribeTopicCount.setCode("subscribe_topic_count");
- subscribeTopicCount.setValue(subCount);
- metricTO.getMetric().put(subscribeTopicCount.getCode(), subscribeTopicCount);
-
- return RestResult.ok(metricTO);
+ public RestResult> overview() {
+ return RestResult.fail(OpenApi.MESSAGE_UPGRADE);
}
@RequestMapping(OpenApi.DASHBOARD_NODES)
@@ -117,52 +88,8 @@ public class DashBoardController {
* @return
*/
@RequestMapping(OpenApi.DASHBOARD_METRICS)
- public RestResult metrics() {
- MetricTO metricTO = new MetricTO();
- //连接
- List connectionGroup = new ArrayList<>();
- metricTO.getGroup().put("connection", connectionGroup);
- connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_CONNECT));
- connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_DISCONNECT));
- connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_SUBSCRIBE));
- connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_UNSUBSCRIBE));
-
- //会话
- List sessionGroup = new ArrayList<>();
- metricTO.getGroup().put("session", sessionGroup);
- //认证与权限
- List accessGroup = new ArrayList<>();
- metricTO.getGroup().put("access", accessGroup);
-
- //流量收发
- List bytesGroup = new ArrayList<>();
- metricTO.getGroup().put("bytes", bytesGroup);
-
- //报文
- List packetGroup = new ArrayList<>();
- metricTO.getGroup().put("packet", packetGroup);
- packetGroup.add(brokerContext.metric(MqttMetricEnum.BYTES_RECEIVED));
- packetGroup.add(brokerContext.metric(MqttMetricEnum.BYTES_SENT));
- packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_RECEIVED));
- packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_SENT));
- packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_CONNECT_RECEIVED));
- packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_CONNACK_SENT));
-
-
- //消息数量
- List messageGroup = new ArrayList<>();
- metricTO.getGroup().put("message", messageGroup);
- messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS0_RECEIVED));
- messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS1_RECEIVED));
- messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS2_RECEIVED));
- messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS0_SENT));
- messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS1_SENT));
- messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS2_SENT));
-
- //消息分发
- List deliveryGroup = new ArrayList<>();
- metricTO.getGroup().put("delivery", deliveryGroup);
- return RestResult.ok(metricTO);
+ public RestResult> metrics() {
+ return RestResult.fail(OpenApi.MESSAGE_UPGRADE);
}
}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java
index b6c9f38ee967883509e25e29d29232c1837a0648..fa281f9c6291f01fdb82a676a4436c9e33fe2246 100644
--- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java
@@ -60,6 +60,6 @@ public class SubscribeProcessor extends AuthorizedMqttProcessor subNode = new ConcurrentHashMap<>();
+
+ public BrokerTopic addTopic(String topic) {
+ BrokerTopic brokerTopic = new BrokerTopic(topic);
+ TopicToken topicToken = brokerTopic.getTopicToken();
+ TopicPublishTree treeNode = this;
+ while (true) {
+ treeNode = treeNode.subNode.computeIfAbsent(topicToken.getNode(), n -> new TopicPublishTree());
+ if (topicToken.getNextNode() == null) {
+ break;
+ } else {
+ topicToken = topicToken.getNextNode();
+ }
+ }
+ treeNode.brokerTopic = new BrokerTopic(topic);
+ return treeNode.brokerTopic;
+ }
+
+ public void match(TopicToken topicToken, Consumer consumer) {
+ match(this, topicToken, consumer);
+ }
+
+ private void match(TopicPublishTree treeNode, TopicToken topicToken, Consumer consumer) {
+ //匹配结束
+ if (topicToken == null) {
+ if (treeNode.brokerTopic != null) {
+ consumer.accept(treeNode.brokerTopic);
+ }
+ return;
+ }
+ //合法的#通配符必然存在于末端
+ if ("#".equals(topicToken.getNode())) {
+ treeNode.subNode.values().forEach(node -> {
+ subscribeChildren(node, consumer);
+ });
+ } else if ("+".equals(topicToken.getNode())) {
+ treeNode.subNode.values().forEach(node -> {
+ match(node, topicToken.getNextNode(), consumer);
+ });
+ } else {
+ TopicPublishTree node = treeNode.subNode.get(topicToken.getNode());
+ if (node != null) {
+ match(node, topicToken.getNextNode(), consumer);
+ }
+ }
+ }
+
+ private void subscribeChildren(TopicPublishTree treeNode, Consumer consumer) {
+ BrokerTopic brokerTopic = treeNode.brokerTopic;
+ if (brokerTopic != null) {
+ consumer.accept(brokerTopic);
+ }
+ //递归订阅Topic
+ treeNode.subNode.values().forEach(subNode -> subscribeChildren(subNode, consumer));
+ }
+}
diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java
new file mode 100644
index 0000000000000000000000000000000000000000..1ab0df22f8732e3756f44a638c20456bc5220e68
--- /dev/null
+++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
+ *
+ * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
+ *
+ * Enterprise users are required to use this project reasonably
+ * and legally in accordance with the AGPL-3.0 open source agreement
+ * without special permission from the smartboot organization.
+ */
+
+package org.smartboot.mqtt.broker.topic;
+
+import org.smartboot.mqtt.broker.MqttSession;
+import org.smartboot.mqtt.broker.TopicFilterSubscriber;
+import org.smartboot.mqtt.common.TopicToken;
+import org.smartboot.mqtt.common.util.ValidateUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+
+/**
+ * @author 三刀(zhengjunweimail@163.com)
+ * @version V1.0 , 5/28/23
+ */
+public class TopicSubscribeTree {
+ private final Map subscribers = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap subNode = new ConcurrentHashMap<>();
+
+ public void subscribeTopic(MqttSession session, TopicFilterSubscriber subscriber) {
+ TopicSubscribeTree treeNode = this;
+ TopicToken token = subscriber.getTopicFilterToken();
+ do {
+ treeNode = treeNode.subNode.computeIfAbsent(token.getNode(), n -> new TopicSubscribeTree());
+ } while ((token = token.getNextNode()) != null);
+ treeNode.subscribers.put(session, subscriber);
+ }
+
+ public void unsubscribe(MqttSession session, TopicFilterSubscriber subscriber) {
+ TopicSubscribeTree subscribeTree = this;
+ TopicToken topicToken = subscriber.getTopicFilterToken();
+ while (true) {
+ subscribeTree = subscribeTree.subNode.get(topicToken.getNode());
+ if (topicToken.getNextNode() == null) {
+ break;
+ }
+ topicToken = topicToken.getNextNode();
+ }
+ subscribeTree.subscribers.remove(session);
+ }
+
+
+ public void match(TopicToken topicToken, BiConsumer consumer) {
+ //精确匹配
+ TopicSubscribeTree subscribeTree = subNode.get(topicToken.getNode());
+ if (subscribeTree != null) {
+ if (topicToken.getNextNode() == null) {
+ subscribers.forEach(consumer);
+ } else {
+ subscribeTree.match(topicToken.getNextNode(), consumer);
+ }
+ }
+ subscribeTree = subNode.get("#");
+ if (subscribeTree != null) {
+ ValidateUtils.isTrue(subscribeTree.subNode.isEmpty(), "'#' node must be empty");
+ subscribeTree.subscribers.forEach(consumer);
+ }
+
+ subscribeTree = subNode.get("+");
+ if (subscribeTree != null) {
+ if (topicToken.getNextNode() == null) {
+ subscribers.forEach(consumer);
+ } else {
+ subscribeTree.subNode.values().forEach(t -> match(topicToken.getNextNode(), consumer));
+ }
+ }
+ }
+}
diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml
index b93a632bcb280a36d0d58843226da751386bd6d9..aa4dfbf9cd12a4aad0506a89a2763d9f4ebb1af4 100644
--- a/smart-mqtt-client/pom.xml
+++ b/smart-mqtt-client/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.20
+ 0.21
../pom.xml
4.0.0
@@ -27,5 +27,10 @@
junit
test
+
+ org.smartboot.mqtt
+ smart-mqtt-broker
+ test
+
\ No newline at end of file
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
index be39b3cfe0f5cb4a1f5df3d096f78a6d08660a50..c4160ddb4f2409b70d23c0d6f68f0687f21aaac6 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java
@@ -15,7 +15,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.AbstractSession;
import org.smartboot.mqtt.common.DefaultMqttWriter;
-import org.smartboot.mqtt.common.InflightMessage;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.QosRetryPlugin;
import org.smartboot.mqtt.common.TopicToken;
@@ -28,6 +27,7 @@ import org.smartboot.mqtt.common.message.MqttConnAckMessage;
import org.smartboot.mqtt.common.message.MqttConnectMessage;
import org.smartboot.mqtt.common.message.MqttDisconnectMessage;
import org.smartboot.mqtt.common.message.MqttMessage;
+import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import org.smartboot.mqtt.common.message.MqttPingReqMessage;
import org.smartboot.mqtt.common.message.MqttPingRespMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
@@ -38,6 +38,7 @@ import org.smartboot.mqtt.common.message.MqttUnsubAckMessage;
import org.smartboot.mqtt.common.message.payload.MqttConnectPayload;
import org.smartboot.mqtt.common.message.payload.WillMessage;
import org.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader;
+import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
@@ -60,6 +61,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
@@ -75,7 +77,12 @@ public class MqttClient extends AbstractSession {
* 客户端配置项
*/
private final MqttClientConfigure clientConfigure = new MqttClientConfigure();
- private final AbstractMessageProcessor messageProcessor = new MqttClientProcessor(this);
+ private static final AbstractMessageProcessor messageProcessor = new MqttClientProcessor();
+
+ static {
+ messageProcessor.addPlugin(new QosRetryPlugin());
+ }
+
/**
* 完成connect之前注册的事件
*/
@@ -85,6 +92,8 @@ public class MqttClient extends AbstractSession {
*/
private final Map subscribes = new ConcurrentHashMap<>();
+ private final Map mapping = new ConcurrentHashMap<>();
+
private final List wildcardsToken = new LinkedList<>();
private AioQuickClient client;
@@ -169,11 +178,11 @@ public class MqttClient extends AbstractSession {
if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
setInflightQueue(new InflightQueue(this, 16));
connected = true;
- consumeTask();
//重连情况下重新触发订阅逻辑
subscribes.forEach((k, v) -> {
subscribe(k, v.getQoS(), v.getConsumer());
});
+ consumeTask();
}
//客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发
//任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或
@@ -215,7 +224,7 @@ public class MqttClient extends AbstractSession {
}, clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS);
}
// messageProcessor.addPlugin(new StreamMonitorPlugin<>());
- messageProcessor.addPlugin(new QosRetryPlugin());
+
client = new AioQuickClient(clientConfigure.getHost(), clientConfigure.getPort(), new MqttProtocol(clientConfigure.getMaxPacketSize()), messageProcessor);
try {
if (bufferPagePool != null) {
@@ -223,7 +232,9 @@ public class MqttClient extends AbstractSession {
}
client.setReadBufferSize(clientConfigure.getBufferSize()).setWriteBuffer(clientConfigure.getBufferSize(), 8).connectTimeout(clientConfigure.getConnectionTimeout());
session = client.start(asynchronousChannelGroup);
- session.setAttachment(new Attachment());
+ Attachment attachment = new Attachment();
+ session.setAttachment(attachment);
+ attachment.put(MqttClientProcessor.SESSION_KEY, this);
setMqttVersion(clientConfigure.getMqttVersion());
mqttWriter = new DefaultMqttWriter(session.writeBuffer());
@@ -299,12 +310,17 @@ public class MqttClient extends AbstractSession {
unsubscribeBuilder.properties(properties);
}
// wait ack message.
- getInflightQueue().offer(unsubscribeBuilder, (message) -> {
+ CompletableFuture> future = getInflightQueue().offer(unsubscribeBuilder, () -> registeredTasks.offer(() -> unsubscribe0(topics)));
+ if (future == null) {
+ return;
+ }
+ future.whenComplete((message, throwable) -> {
ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type.");
for (String unsubscribedTopic : unsubscribedTopics) {
subscribes.remove(unsubscribedTopic);
wildcardsToken.removeIf(topicToken -> StringUtils.equals(unsubscribedTopic, topicToken.getTopicFilter()));
}
+ mapping.clear();
consumeTask();
});
flush();
@@ -343,7 +359,18 @@ public class MqttClient extends AbstractSession {
subscribeBuilder.subscribeProperties(new SubscribeProperties());
}
MqttSubscribeMessage subscribeMessage = subscribeBuilder.build();
- getInflightQueue().offer(subscribeBuilder, (message) -> {
+
+ CompletableFuture> future = getInflightQueue().offer(subscribeBuilder, new Runnable() {
+ @Override
+ public void run() {
+
+ }
+ });
+ if (future == null) {
+ registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer));
+ return;
+ }
+ future.whenComplete((message, throwable) -> {
List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels();
ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response");
int i = 0;
@@ -360,11 +387,13 @@ public class MqttClient extends AbstractSession {
} else {
LOGGER.error("subscribe topic:{} fail", subscription.getTopicFilter());
}
+ mapping.clear();
subAckConsumer.accept(MqttClient.this, minQos);
}
consumeTask();
});
flush();
+
}
public void notifyResponse(MqttConnAckMessage connAckMessage) {
@@ -425,25 +454,8 @@ public class MqttClient extends AbstractSession {
consumer.accept(0);
return;
}
- InflightQueue inflightQueue = getInflightQueue();
- InflightMessage inflightMessage = inflightQueue.offer(publishBuilder, (message) -> {
- consumer.accept(message.getVariableHeader().getPacketId());
- //最早发送的消息若收到响应,则更新点位
- synchronized (MqttClient.this) {
- MqttClient.this.notifyAll();
- }
- });
- if (inflightMessage == null) {
- try {
- synchronized (this) {
- wait();
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- publish(publishBuilder, consumer, autoFlush);
- return;
- }
+ CompletableFuture> future = inflightQueue.put(publishBuilder);
+ future.whenComplete((message, throwable) -> consumer.accept(message.getVariableHeader().getPacketId()));
if (autoFlush) {
flush();
}
@@ -457,6 +469,10 @@ public class MqttClient extends AbstractSession {
return subscribes;
}
+ public Map getMapping() {
+ return mapping;
+ }
+
public List getWildcardsToken() {
return wildcardsToken;
}
@@ -468,6 +484,9 @@ public class MqttClient extends AbstractSession {
*/
@Override
public void disconnect() {
+ if (disconnect) {
+ return;
+ }
//DISCONNECT 报文是客户端发给服务端的最后一个控制报文。表示客户端正常断开连接。
write(new MqttDisconnectMessage());
//关闭自动重连
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java
index b15876b6258c72556a0efc6d6447efed0427feee..e216c56340bd71d0d096e5b4f0e19f815b8473d9 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java
@@ -20,6 +20,7 @@ import org.smartboot.mqtt.client.processor.PubRelProcessor;
import org.smartboot.mqtt.client.processor.PublishProcessor;
import org.smartboot.mqtt.common.eventbus.EventObject;
import org.smartboot.mqtt.common.eventbus.EventType;
+import org.smartboot.mqtt.common.exception.MqttException;
import org.smartboot.mqtt.common.message.MqttConnAckMessage;
import org.smartboot.mqtt.common.message.MqttMessage;
import org.smartboot.mqtt.common.message.MqttPingRespMessage;
@@ -29,9 +30,12 @@ import org.smartboot.mqtt.common.message.MqttPubRecMessage;
import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.MqttSubAckMessage;
+import org.smartboot.mqtt.common.util.MqttAttachKey;
import org.smartboot.socket.StateMachineEnum;
import org.smartboot.socket.extension.processor.AbstractMessageProcessor;
import org.smartboot.socket.transport.AioSession;
+import org.smartboot.socket.util.AttachKey;
+import org.smartboot.socket.util.Attachment;
import java.util.HashMap;
import java.util.Map;
@@ -42,7 +46,7 @@ import java.util.Map;
*/
public class MqttClientProcessor extends AbstractMessageProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttClientProcessor.class);
- private final MqttClient mqttClient;
+ final static AttachKey SESSION_KEY = AttachKey.valueOf(MqttAttachKey.MQTT_SESSION);
private static final Map, MqttProcessor extends MqttMessage>> processors = new HashMap<>();
static {
@@ -52,21 +56,20 @@ public class MqttClientProcessor extends AbstractMessageProcessor {
processors.put(MqttPubRecMessage.class, new MqttAckProcessor());
processors.put(MqttPubCompMessage.class, new MqttAckProcessor());
processors.put(MqttPubRelMessage.class, new PubRelProcessor());
- processors.put(MqttSubAckMessage.class, new MqttAckProcessor());
+ processors.put(MqttSubAckMessage.class, new MqttAckProcessor());
processors.put(MqttPingRespMessage.class, new MqttPingRespProcessor());
}
- public MqttClientProcessor(MqttClient mqttClient) {
- this.mqttClient = mqttClient;
- }
@Override
public void process0(AioSession session, MqttMessage msg) {
- mqttClient.getEventBus().publish(EventType.RECEIVE_MESSAGE, EventObject.newEventObject(mqttClient, msg));
+ Attachment attachment = session.getAttachment();
+ MqttClient client = attachment.get(SESSION_KEY);
+ client.getEventBus().publish(EventType.RECEIVE_MESSAGE, EventObject.newEventObject(client, msg));
MqttProcessor processor = processors.get(msg.getClass());
// LOGGER.info("receive msg:{}", msg);
if (processor != null) {
- processor.process(mqttClient, msg);
+ processor.process(client, msg);
} else {
LOGGER.error("unknown msg:{}", msg);
}
@@ -74,9 +77,25 @@ public class MqttClientProcessor extends AbstractMessageProcessor {
@Override
public void stateEvent0(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) {
-// System.out.println(stateMachineEnum);
- if (throwable != null) {
- throwable.printStackTrace();
+ switch (stateMachineEnum) {
+ case DECODE_EXCEPTION:
+ LOGGER.error("decode exception", throwable);
+ break;
+ case SESSION_CLOSED:
+ Attachment attachment = session.getAttachment();
+ attachment.get(SESSION_KEY).disconnect();
+ break;
+ case PROCESS_EXCEPTION:
+ if (throwable instanceof MqttException) {
+ LOGGER.warn("process exception", throwable);
+ ((MqttException) throwable).getCallback().run();
+ }
+ break;
+ default:
+ break;
}
+// if (throwable != null) {
+// throwable.printStackTrace();
+// }
}
}
diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
index 3b053b0718d41ca3ac91eee311f8014845e39654..d69fd428b646c2eee0570a246942751d3b422c7e 100644
--- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
+++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java
@@ -61,12 +61,18 @@ public class PublishProcessor implements MqttProcessor {
private void processPublishMessage(MqttPublishMessage mqttPublishMessage, MqttClient mqttClient) {
MqttPublishVariableHeader header = mqttPublishMessage.getVariableHeader();
- Subscribe subscribe = mqttClient.getSubscribes().get(header.getTopicName());
-
- //尝试通配符匹配
+ Subscribe subscribe = mqttClient.getMapping().get(header.getTopicName());
if (subscribe == null) {
- subscribe = matchWildcardsSubscribe(mqttClient, header.getTopicName());
+ subscribe = mqttClient.getSubscribes().get(header.getTopicName());
+ //尝试通配符匹配
+ if (subscribe == null) {
+ subscribe = matchWildcardsSubscribe(mqttClient, header.getTopicName());
+ }
+ if (subscribe != null) {
+ mqttClient.getMapping().put(header.getTopicName(), subscribe);
+ }
}
+
// If unsubscribed, maybe null.
if (subscribe != null && !subscribe.getUnsubscribed()) {
subscribe.getConsumer().accept(mqttClient, mqttPublishMessage);
diff --git a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java
index 9b29430898431e0798e82b8d27055e5cbb39d206..bfa3262589362bd3f558621001ae6ee68163424e 100644
--- a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java
+++ b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java
@@ -1,8 +1,13 @@
package org.smartboot.mqtt.client;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartboot.mqtt.broker.BrokerContext;
+import org.smartboot.mqtt.broker.BrokerContextImpl;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.util.MqttUtil;
@@ -10,6 +15,7 @@ import java.io.IOException;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -17,18 +23,24 @@ import java.util.concurrent.atomic.AtomicLong;
* @version V1.0 , 2022/4/18
*/
public class Benchmark {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Benchmark.class);
private final String host = "127.0.0.1";
private final int port = 1883;
private AsynchronousChannelGroup channelGroup;
+ private BrokerContext context;
+
@Before
public void init() throws IOException {
channelGroup = AsynchronousChannelGroup.withFixedThreadPool(4, r -> new Thread(r));
+ context = new BrokerContextImpl();
+ context.init();
}
@After
public void destroy() {
channelGroup.shutdown();
+ context.destroy();
}
/**
@@ -81,12 +93,38 @@ public class Benchmark {
System.out.println((System.currentTimeMillis() - s) + ":" + total.get());
}
+ @Test(timeout = 3000)
+ public void testSubscribe2() throws InterruptedException {
+ MqttClient client = new MqttClient(host, port, MqttUtil.createClientId());
+ client.connect(channelGroup);
+
+ int connectCount = 10000;
+ AtomicLong total = new AtomicLong(0);
+ CountDownLatch countDownLatch = new CountDownLatch(connectCount);
+ long s = System.currentTimeMillis();
+
+ while (connectCount-- > 0) {
+ long start = System.currentTimeMillis();
+ String topic = "/topic/" + connectCount;
+ client.subscribe(topic, MqttQoS.AT_MOST_ONCE, (mqttClient, publishMessage) -> {
+ }, (mqttClient, mqttQoS) -> {
+// LOGGER.info("subscribe:{}", "/topic/" + topic);
+ total.addAndGet(System.currentTimeMillis() - start);
+ countDownLatch.countDown();
+ });
+
+ }
+ countDownLatch.await();
+ client.disconnect();
+ System.out.println((System.currentTimeMillis() - s) + ":" + total.get());
+ }
+
/**
* 订阅topic成功后断开连接
*
* @throws InterruptedException
*/
- @Test
+ @Test(timeout = 5000)
public void testPublish() throws InterruptedException {
int connectCount = 100;
int publishCount = Short.MAX_VALUE;
@@ -117,7 +155,7 @@ public class Benchmark {
client.publish(topic, MqttQoS.AT_MOST_ONCE, payload, false);
}
System.out.println("publish finish!");
- publishDownLatch.await();
- System.out.println("publish use time: " + (System.currentTimeMillis() - startPublish));
+ Assert.assertTrue("wait result timeout", publishDownLatch.await(5, TimeUnit.SECONDS));
+ System.out.println("publish use time: " + (System.currentTimeMillis() - startPublish) + " count:" + publishDownLatch.getCount());
}
}
diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml
index 806073bedb739ac80dea9befe92aa3f7910c5fd1..2a2c23d2c2580a00d988208b2f9448fd8f9a8c46 100644
--- a/smart-mqtt-common/pom.xml
+++ b/smart-mqtt-common/pom.xml
@@ -5,7 +5,7 @@
smart-mqtt
org.smartboot.mqtt
- 0.20
+ 0.21
../pom.xml
4.0.0
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
index 8c7d78a6158197de8d9f761f10daff318e8d2335..3099892d327feac92320479db63159d5059292e9 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java
@@ -25,8 +25,7 @@ import org.smartboot.socket.util.Attachment;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Hashtable;
/**
* @author 三刀(zhengjunweimail@163.com)
@@ -54,8 +53,8 @@ public abstract class AbstractSession {
private MqttVersion mqttVersion;
- private InflightQueue inflightQueue;
- private final Map ackMessageCacheMap = new ConcurrentHashMap<>();
+ protected InflightQueue inflightQueue;
+ private final Hashtable ackMessageCacheMap = new Hashtable<>();
public AbstractSession(EventBus eventBus) {
this.eventBus = eventBus;
@@ -77,7 +76,7 @@ public abstract class AbstractSession {
public final synchronized void write(MqttMessage mqttMessage, boolean autoFlush) {
try {
if (disconnect) {
- this.disconnect();
+// this.disconnect();
ValidateUtils.isTrue(false, "已断开连接,无法发送消息");
}
mqttMessage.setVersion(mqttVersion);
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java
index 168c10d77fbf377fc25fb38152e59822ccfb6b96..4d10632fce23ceb4a597fe3a02b9d9e8b70b3118 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java
@@ -18,7 +18,7 @@ import org.smartboot.mqtt.common.message.MqttSubscribeMessage;
import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage;
import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
-import java.util.function.Consumer;
+import java.util.concurrent.CompletableFuture;
/**
* @author 三刀(zhengjunweimail@163.com)
@@ -31,6 +31,7 @@ public class InflightMessage {
private final MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> originalMessage;
private MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> responseMessage;
+ private final CompletableFuture> future = new CompletableFuture<>();
/**
* 飞行队列为其分配的packetId
*/
@@ -40,16 +41,13 @@ public class InflightMessage {
private boolean commit;
- private final Consumer> consumer;
-
private int retryCount;
private long latestTime;
- public InflightMessage(int packetId, MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> originalMessage, Consumer> consumer) {
+ public InflightMessage(int packetId, MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> originalMessage) {
this.assignedPacketId = packetId;
this.originalMessage = originalMessage;
- this.consumer = consumer;
if (originalMessage instanceof MqttSubscribeMessage) {
this.expectMessageType = MqttMessageType.SUBACK;
} else if (originalMessage instanceof MqttUnsubscribeMessage) {
@@ -104,10 +102,6 @@ public class InflightMessage {
this.latestTime = latestTime;
}
- public final Consumer> getConsumer() {
- return consumer;
- }
-
public int getAssignedPacketId() {
return assignedPacketId;
}
@@ -120,4 +114,7 @@ public class InflightMessage {
this.responseMessage = responseMessage;
}
+ public CompletableFuture> getFuture() {
+ return future;
+ }
}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
index efb4f18992c0d53c7c2e566c80e470338add6859..9859587effad42794871fdf5ccd7b9b51e913b81 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java
@@ -14,24 +14,28 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.enums.MqttMessageType;
import org.smartboot.mqtt.common.enums.MqttVersion;
+import org.smartboot.mqtt.common.exception.MqttException;
import org.smartboot.mqtt.common.message.MqttFixedHeader;
import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
+import org.smartboot.mqtt.common.message.MqttSubscribeMessage;
import org.smartboot.mqtt.common.message.MqttVariableMessage;
import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
+import org.smartboot.mqtt.common.util.MqttAttachKey;
import org.smartboot.mqtt.common.util.MqttMessageBuilders;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.util.AttachKey;
import org.smartboot.socket.util.Attachment;
import org.smartboot.socket.util.QuickTimerTask;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
/**
* @author 三刀(zhengjunweimail@163.com)
@@ -39,8 +43,8 @@ import java.util.function.Consumer;
*/
public class InflightQueue {
private static final Logger LOGGER = LoggerFactory.getLogger(InflightQueue.class);
- static final AttachKey RETRY_TASK_ATTACH_KEY = AttachKey.valueOf("retryTask");
- private static final int TIMEOUT = 3;
+ static final AttachKey RETRY_TASK_ATTACH_KEY = AttachKey.valueOf(MqttAttachKey.RETRY_TASK);
+ private static final int TIMEOUT = 30;
private final InflightMessage[] queue;
private int takeIndex;
private int putIndex;
@@ -49,7 +53,10 @@ public class InflightQueue {
private final AtomicInteger packetId = new AtomicInteger(0);
private final AbstractSession session;
- private final ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>();
+
+ private final ReentrantLock lock = new ReentrantLock(false);
+
+ private final Condition notFull = lock.newCondition();
public InflightQueue(AbstractSession session, int size) {
ValidateUtils.isTrue(size > 0, "inflight must >0");
@@ -57,40 +64,69 @@ public class InflightQueue {
this.session = session;
}
- public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) {
- return offer(publishBuilder, consumer, null);
+ public CompletableFuture> put(MqttMessageBuilders.MessageBuilder publishBuilder) {
+ final ReentrantLock lock = this.lock;
+ try {
+ lock.lockInterruptibly();
+ while (count == queue.length) {
+ notFull.await();
+ }
+ return enqueue(publishBuilder);
+ } catch (Exception e) {
+ throw new MqttException("put message into inflight queue exception", e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder) {
+ return offer(publishBuilder, () -> {
+
+ });
}
- public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer, Runnable runnable) {
- InflightMessage inflightMessage;
- synchronized (this) {
+ public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Runnable runnable) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
if (count == queue.length) {
- if (runnable != null) {
- runnables.offer(runnable);
+ int i = putIndex - 1;
+ if (i < 0) {
+ i = queue.length - 1;
}
+ queue[i].getFuture().thenAccept(mqttPacketIdentifierMessage -> runnable.run());
return null;
+ } else {
+ return enqueue(publishBuilder);
}
- int id = packetId.incrementAndGet();
- // 16位无符号最大值65535
- if (id > 65535) {
- id = id % queue.length + queue.length;
- packetId.set(id);
- }
- MqttPacketIdentifierMessage mqttMessage = publishBuilder.packetId(id).build();
- inflightMessage = new InflightMessage(id, mqttMessage, consumer);
- queue[putIndex++] = inflightMessage;
- if (putIndex == queue.length) {
- putIndex = 0;
- }
- count++;
+ } finally {
+ lock.unlock();
+ }
+ }
- //启动消息质量监测
- if (count == 1) {
- retry(inflightMessage);
- }
+
+ public CompletableFuture> enqueue(MqttMessageBuilders.MessageBuilder publishBuilder) {
+
+ int id = packetId.incrementAndGet();
+ // 16位无符号最大值65535
+ if (id > 65535) {
+ id = id % queue.length + queue.length;
+ packetId.set(id);
+ }
+ MqttPacketIdentifierMessage mqttMessage = publishBuilder.packetId(id).build();
+ InflightMessage inflightMessage = new InflightMessage(id, mqttMessage);
+ queue[putIndex++] = inflightMessage;
+ if (putIndex == queue.length) {
+ putIndex = 0;
+ }
+ count++;
+
+ //启动消息质量监测
+ if (count == 1) {
+ retry(inflightMessage);
}
session.write(inflightMessage.getOriginalMessage(), count == queue.length);
- return inflightMessage;
+ return inflightMessage.getFuture();
}
/**
@@ -100,14 +136,14 @@ public class InflightQueue {
if (inflightMessage.isCommit() || session.isDisconnect()) {
return;
}
- QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new Runnable() {
+ QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() {
@Override
- public void run() {
+ public void execute() {
if (inflightMessage.isCommit()) {
// System.out.println("message has commit,ignore retry monitor");
return;
}
- if (session.isDisconnect()) {
+ if (session.session.isInvalid()) {
LOGGER.debug("session is disconnect , pause qos monitor.");
return;
}
@@ -137,6 +173,11 @@ public class InflightQueue {
MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(MqttFixedHeader.PUB_REL_HEADER_DUP, variableHeader);
session.write(pubRelMessage);
break;
+ case SUBACK:
+ MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) inflightMessage.getOriginalMessage();
+ MqttSubscribeMessage dupSubscribeMessage = new MqttSubscribeMessage(MqttFixedHeader.SUBSCRIBE_HEADER_DUP, subscribeMessage.getVariableHeader(), subscribeMessage.getPayload());
+ session.write(dupSubscribeMessage);
+ break;
default:
throw new UnsupportedOperationException("invalid message type: " + inflightMessage.getExpectMessageType());
}
@@ -152,6 +193,10 @@ public class InflightQueue {
*/
public void notify(MqttPacketIdentifierMessage extends MqttPacketIdVariableHeader> message) {
InflightMessage inflightMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length];
+ if (inflightMessage == null) {
+ LOGGER.info("ignore duplicate message");
+ return;
+ }
switch (message.getFixedHeader().getMessageType()) {
case SUBACK:
case UNSUBACK:
@@ -163,7 +208,13 @@ public class InflightQueue {
}
inflightMessage.setResponseMessage(message);
inflightMessage.setLatestTime(System.currentTimeMillis());
- commit(inflightMessage);
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ commit(inflightMessage);
+ } finally {
+ lock.unlock();
+ }
break;
}
case PUBREC:
@@ -189,7 +240,7 @@ public class InflightQueue {
}
}
- public synchronized void commit(InflightMessage inflightMessage) {
+ private void commit(InflightMessage inflightMessage) {
MqttVariableMessage extends MqttPacketIdVariableHeader> originalMessage = inflightMessage.getOriginalMessage();
ValidateUtils.isTrue(originalMessage.getFixedHeader().getQosLevel().value() == 0 || originalMessage.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message");
inflightMessage.setCommit(true);
@@ -199,34 +250,28 @@ public class InflightQueue {
}
queue[takeIndex++] = null;
count--;
+
if (takeIndex == queue.length) {
takeIndex = 0;
}
- inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage());
+ inflightMessage.getFuture().complete(inflightMessage.getResponseMessage());
while (count > 0 && queue[takeIndex].isCommit()) {
inflightMessage = queue[takeIndex];
- inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage());
+ inflightMessage.getFuture().complete(inflightMessage.getResponseMessage());
queue[takeIndex++] = null;
if (takeIndex == queue.length) {
takeIndex = 0;
}
count--;
}
-
+ if (count < queue.length) {
+ notFull.signal();
+ }
if (count > 0) {
//注册超时监听任务
Attachment attachment = session.session.getAttachment();
InflightMessage monitorMessage = queue[takeIndex];
attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage));
}
-
- while (count < queue.length) {
- Runnable runnable = runnables.poll();
- if (runnable != null) {
- runnable.run();
- } else {
- break;
- }
- }
}
}
\ No newline at end of file
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MetricTypeEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MetricTypeEnum.java
deleted file mode 100644
index 0553ab12543de67420265d8e4ccb1de5675b5a75..0000000000000000000000000000000000000000
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MetricTypeEnum.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
- *
- * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
- *
- * Enterprise users are required to use this project reasonably
- * and legally in accordance with the AGPL-3.0 open source agreement
- * without special permission from the smartboot organization.
- */
-
-package org.smartboot.mqtt.common.enums;
-
-/**
- * @author 三刀(zhengjunweimail@163.com)
- * @version V1.0 , 2023/2/19
- */
-public enum MetricTypeEnum {
- /**
- * 基础指标:指表达业务实体原子量化属性的且不可再分的概念集合
- */
- BASIC,
- /**
- * 复合指标:指建立在基础指标之上,通过一定运算规则形成的计算指标集合
- */
- COMPOSITE
-}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttMetricEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttMetricEnum.java
deleted file mode 100644
index b9609df08ad6642eaa5f536a64d113c002279ca4..0000000000000000000000000000000000000000
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttMetricEnum.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
- *
- * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
- *
- * Enterprise users are required to use this project reasonably
- * and legally in accordance with the AGPL-3.0 open source agreement
- * without special permission from the smartboot organization.
- */
-
-package org.smartboot.mqtt.common.enums;
-
-/**
- * @author 三刀(zhengjunweimail@163_com)
- * @version V1_0 , 2023/1/26
- */
-public enum MqttMetricEnum {
- CLIENT_ONLINE("client_online", "客户端在线数"),
- CLIENT_CONNECT("client_connect", "客户端连接次数"),
- CLIENT_DISCONNECT("client_disconnected", "客户端断开连接次数"),
- CLIENT_SUBSCRIBE("client_subscribe", "订阅次数"),
- CLIENT_UNSUBSCRIBE("client_unsubscribe", "取消订阅次数"),
- BYTES_RECEIVED("bytes_received", "已接收字节数"),
- BYTES_SENT("bytes_sent", "已发送字节数"),
-
- PACKETS_CONNECT_RECEIVED("packets_connect_received", "接收的 CONNECT 报文数量"),
- PACKETS_CONNACK_SENT("packets_connack_sent", "发送的 CONNACK 报文数量"),
-
- PACKETS_PUBLISH_RECEIVED("packets_publish_received", "接收的 PUBLISH 报文数量"),
- PACKETS_PUBLISH_SENT("packets_publish_sent", "发送的 PUBLISH 报文数量"),
-
- PACKETS_RECEIVED("packets_received", "接收的报文数量"),
- PACKETS_SENT("packets_sent", "发送的报文数量"),
-
-
- TOPIC_COUNT("topic_count", "Topic数量"),
-
- MESSAGE_QOS0_RECEIVED("messages_qos0_received", "接收来自客户端的 QoS 0 消息数量"),
- MESSAGE_QOS1_RECEIVED("messages_qos1_received", "接收来自客户端的 QoS 1 消息数量"),
- MESSAGE_QOS2_RECEIVED("messages_qos2_received", "接收来自客户端的 QoS 2 消息数量"),
- MESSAGE_QOS0_SENT("messages_qos0_sent", "发送给客户端的 QoS 0 消息数量"),
- MESSAGE_QOS1_SENT("messages_qos1_sent", "发送给客户端的 QoS 1 消息数量"),
- MESSAGE_QOS2_SENT("messages_qos2_sent", "发送给客户端的 QoS 2 消息数量"),
-
- PERIOD_MESSAGE_RECEIVED("period_message_received", "周期内接收消息数", MetricTypeEnum.COMPOSITE),
-
- PERIOD_MESSAGE_SENT("period_message_sent", "周期内发送消息数", MetricTypeEnum.COMPOSITE);
-
- private final String code;
- private final String desc;
-
- private final MetricTypeEnum type;
-
- MqttMetricEnum(String code, String desc, MetricTypeEnum type) {
- this.code = code;
- this.desc = desc;
- this.type = type;
- }
-
- MqttMetricEnum(String code, String desc) {
- this(code, desc, MetricTypeEnum.BASIC);
- }
-
- public String getCode() {
- return code;
- }
-
- public String getDesc() {
- return desc;
- }
-
- public MetricTypeEnum getType() {
- return type;
- }
-}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java
index 47769341fc5a9cb71b2ebf377176a37eab0b5c9b..07cffa571022360c6fb29fc50ad3e01f23456848 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java
@@ -16,6 +16,7 @@ import org.smartboot.mqtt.common.enums.MqttMessageType;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.message.MqttFixedHeader;
import org.smartboot.mqtt.common.message.MqttMessage;
+import org.smartboot.mqtt.common.util.MqttAttachKey;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.Protocol;
import org.smartboot.socket.transport.AioSession;
@@ -36,8 +37,8 @@ public class MqttProtocol implements Protocol {
private static final Logger logger = LoggerFactory.getLogger(MqttProtocol.class);
private final int maxBytesInMessage;
- public static final AttachKey MQTT_VERSION_ATTACH_KEY = AttachKey.valueOf("mqtt_version");
- private static final AttachKey DECODE_UNIT_ATTACH_KEY = AttachKey.valueOf("decodeUnit");
+ public static final AttachKey MQTT_VERSION_ATTACH_KEY = AttachKey.valueOf(MqttAttachKey.MQTT_VERSION);
+ private static final AttachKey DECODE_UNIT_ATTACH_KEY = AttachKey.valueOf(MqttAttachKey.DECODE_UNIT);
public MqttProtocol(int maxBytesInMessage) {
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java
deleted file mode 100644
index 9b031cfb45d44f1c5a100f6835852a6fd4a58a6f..0000000000000000000000000000000000000000
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
- *
- * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
- *
- * Enterprise users are required to use this project reasonably
- * and legally in accordance with the AGPL-3.0 open source agreement
- * without special permission from the smartboot organization.
- */
-
-package org.smartboot.mqtt.common.to;
-
-import com.alibaba.fastjson2.annotation.JSONField;
-import org.smartboot.mqtt.common.enums.MqttMetricEnum;
-
-import java.util.Date;
-import java.util.concurrent.atomic.LongAdder;
-
-/**
- * @author 三刀(zhengjunweimail@163.com)
- * @version V1.0 , 2023/1/26
- */
-public class MetricItemTO {
- /**
- * 指标编码
- */
- private String code;
- /**
- * 指标描述
- */
- private String desc;
-
- /**
- * 指标数据
- */
- @JSONField(serialize = false)
- private final LongAdder metric = new LongAdder();
-
- /**
- * 采集周期,单位:秒,非正整数表示禁用周期统计
- */
- private final int period;
- /**
- * 未启用周期采集改值为null
- */
- @JSONField(format = "yyyy-MM-dd HH:mm:ss")
- private Date time;
-
-
- public MetricItemTO() {
- this.period = 0;
- }
-
- public MetricItemTO(MqttMetricEnum metricEnum) {
- this(metricEnum, 0);
- }
-
- public MetricItemTO(MqttMetricEnum metricEnum, int period) {
- this.code = metricEnum.getCode();
- this.desc = metricEnum.getDesc();
- this.period = period;
- }
-
- public String getCode() {
- return code;
- }
-
- public void setCode(String code) {
- this.code = code;
- }
-
- public String getDesc() {
- return desc;
- }
-
- public void setDesc(String desc) {
- this.desc = desc;
- }
-
- public int getValue() {
- return metric.intValue();
- }
-
- public void setValue(int value) {
- metric.reset();
- metric.add(value);
- }
-
- public LongAdder getMetric() {
- return metric;
- }
-
- public int getPeriod() {
- return period;
- }
-
- public Date getTime() {
- return time;
- }
-
- public void setTime(Date time) {
- this.time = time;
- }
-}
diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricTO.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttAttachKey.java
similarity index 42%
rename from smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricTO.java
rename to smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttAttachKey.java
index 17b44d0562847859b62d31eed86260dbd86bcc3b..68142ccd896f13e44a13538ae53ac94bd1dfebdd 100644
--- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricTO.java
+++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttAttachKey.java
@@ -8,33 +8,27 @@
* without special permission from the smartboot organization.
*/
-package org.smartboot.mqtt.common.to;
+package org.smartboot.mqtt.common.util;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.smartboot.socket.util.AttachKey;
/**
* @author 三刀(zhengjunweimail@163.com)
- * @version V1.0 , 2023/1/22
+ * @version V1.0 , 5/27/23
*/
-public class MetricTO {
- /**
- * 指标项
- */
- private final Map metric = new HashMap<>();
+public class MqttAttachKey {
+ public static final String MQTT_VERSION = "mqtt_version";
- /**
- * 指标分组
- */
- private final Map> group = new HashMap<>();
+ public static final String DECODE_UNIT = "decode_unit";
+ public static final String RETRY_TASK = "retry_task";
+ public static final String MQTT_SESSION = "mqtt_session";
-
- public Map> getGroup() {
- return group;
- }
-
- public Map getMetric() {
- return metric;
+ static {
+ AttachKey.reset();
+ AttachKey.valueOf(MQTT_SESSION);
+ AttachKey.valueOf(MQTT_VERSION);
+ AttachKey.valueOf(DECODE_UNIT);
+ AttachKey.valueOf(RETRY_TASK);
+ AttachKey.reset();
}
}