diff --git a/docker-compose.yml b/docker-compose.yml index dc762a834ec94f4f0c6ef942d774777b44c6ffd0..9367fa4ec3200f20a74cd4acc18a94346d996367 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -53,6 +53,6 @@ services: options: max-size: "100m" max-file: "1" -# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe - command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish + command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=2 -Dpublisher=10 -Dcount=1 -Dpayload=128 org.smartboot.bench.mqtt.Subscribe +# command: java -cp smart-mqtt-bench.jar -Dhost=mqtt-broker -Dconnect=2000 -Dqos=0 -Dcount=10 -Dpayload=128 org.smartboot.bench.mqtt.Publish version: '3.7' \ No newline at end of file diff --git a/plugins/pom.xml b/plugins/pom.xml index 24bee1331b7cd8f316cf93888bbb61e77d367947..bee091216462c50f10872c5e2ef5c1d2c60dc838 100644 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -16,7 +16,7 @@ org.smartboot.mqtt smart-mqtt - 0.20 + 0.21 ../pom.xml pom diff --git a/plugins/redis-bridge-plugin/pom.xml b/plugins/redis-bridge-plugin/pom.xml index a89708bce7e8fa9b05a3d968b2af0de5fbbc8ed4..f4e16694b6ad5b6f2d9a1e9c84def2e413a8d367 100644 --- a/plugins/redis-bridge-plugin/pom.xml +++ b/plugins/redis-bridge-plugin/pom.xml @@ -3,7 +3,7 @@ plugins org.smartboot.mqtt - 0.20 + 0.21 ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index cacb49b895c2225db9da774b43fe88c3d99471c4..6e52628a2764665e6846beeb173b3cd5bfafc3f5 100644 --- a/pom.xml +++ b/pom.xml @@ -4,13 +4,13 @@ org.smartboot.mqtt smart-mqtt smart-mqtt - 0.20 + 0.21 4.0.0 mqtt broker - 0.20 - 1.5.27 + 0.21 + 1.5.29 1.1.22 2.6 4.3 @@ -66,7 +66,7 @@ com.alibaba.fastjson2 fastjson2 - 2.0.20.graal + 2.0.21.graal junit diff --git a/smart-mqtt-broker/pom.xml b/smart-mqtt-broker/pom.xml index e50e103d6e342f79e62b2f98a4b79f75a771de80..aacc07888dd035fbdb3ae17357429d74f670f55f 100644 --- a/smart-mqtt-broker/pom.xml +++ b/smart-mqtt-broker/pom.xml @@ -5,7 +5,7 @@ org.smartboot.mqtt smart-mqtt - 0.20 + 0.21 ../pom.xml 4.0.0 diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java index caa7f048ecd297e3113f5d880a02f1c965af34c9..408aa30e2b43cdd674a07353d5dd388bbab7f51e 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerConfigure.java @@ -11,8 +11,12 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.common.ToString; +import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.socket.extension.plugins.Plugin; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; /** @@ -38,7 +42,7 @@ public class BrokerConfigure extends ToString { /** * 当前smart-mqtt */ - public static final String VERSION = "v0.20"; + public static final String VERSION = "v0.21"; static final Map SystemEnvironments = new HashMap<>(); @@ -100,6 +104,8 @@ public class BrokerConfigure extends ToString { */ private int maxInflight = 8; + private final List> plugins = new LinkedList<>(); + public int getPort() { return port; } @@ -188,6 +194,15 @@ public class BrokerConfigure extends ToString { this.name = name; } + public BrokerConfigure addPlugin(Plugin plugin) { + plugins.add(plugin); + return this; + } + + public List> getPlugins() { + return plugins; + } + @Override public String toString() { return "BrokerConfigure{" + diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java index 28c8cc490e1d6db3fe2e58791835cdda8336117a..1e68393344379b50b4def018e9ae3b9f2992a931 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContext.java @@ -11,13 +11,16 @@ package org.smartboot.mqtt.broker; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; +import org.smartboot.mqtt.broker.processor.MqttProcessor; import org.smartboot.mqtt.broker.provider.Providers; -import org.smartboot.mqtt.common.enums.MqttMetricEnum; +import org.smartboot.mqtt.broker.topic.TopicPublishTree; +import org.smartboot.mqtt.broker.topic.TopicSubscribeTree; import org.smartboot.mqtt.common.eventbus.EventBus; -import org.smartboot.mqtt.common.to.MetricItemTO; +import org.smartboot.mqtt.common.message.MqttMessage; import java.io.IOException; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; /** @@ -84,11 +87,9 @@ public interface BrokerContext { */ T parseConfig(String path, Class clazz); - MqttBrokerMessageProcessor getMessageProcessor(); + Map, MqttProcessor> getMessageProcessors(); - /** - * 运行指标 - */ - MetricItemTO metric(MqttMetricEnum metricEnum); + TopicPublishTree getPublishTopicTree(); + TopicSubscribeTree getTopicSubscribeTree(); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 8b61bb1632f865c8cca1c151e550eba3bb6f9854..bdf8b6fade4e74403fa5accc3c6c2f847d6873d2 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -13,6 +13,7 @@ package org.smartboot.mqtt.broker; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONPath; import com.alibaba.fastjson2.JSONReader; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,33 +24,49 @@ import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBusSubscriber; import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.RetainPersistenceConsumer; import org.smartboot.mqtt.broker.plugin.Plugin; +import org.smartboot.mqtt.broker.processor.ConnectProcessor; +import org.smartboot.mqtt.broker.processor.DisConnectProcessor; +import org.smartboot.mqtt.broker.processor.MqttAckProcessor; +import org.smartboot.mqtt.broker.processor.MqttProcessor; +import org.smartboot.mqtt.broker.processor.PingReqProcessor; +import org.smartboot.mqtt.broker.processor.PubRelProcessor; +import org.smartboot.mqtt.broker.processor.PublishProcessor; +import org.smartboot.mqtt.broker.processor.SubscribeProcessor; +import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor; import org.smartboot.mqtt.broker.provider.Providers; import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; +import org.smartboot.mqtt.broker.topic.TopicPublishTree; +import org.smartboot.mqtt.broker.topic.TopicSubscribeTree; import org.smartboot.mqtt.common.AsyncTask; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.QosRetryPlugin; -import org.smartboot.mqtt.common.enums.MqttMetricEnum; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventBus; import org.smartboot.mqtt.common.eventbus.EventBusImpl; import org.smartboot.mqtt.common.eventbus.EventBusSubscriber; import org.smartboot.mqtt.common.eventbus.EventType; -import org.smartboot.mqtt.common.message.MqttConnAckMessage; import org.smartboot.mqtt.common.message.MqttConnectMessage; +import org.smartboot.mqtt.common.message.MqttDisconnectMessage; import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; +import org.smartboot.mqtt.common.message.MqttPingReqMessage; +import org.smartboot.mqtt.common.message.MqttPubAckMessage; +import org.smartboot.mqtt.common.message.MqttPubCompMessage; +import org.smartboot.mqtt.common.message.MqttPubRecMessage; +import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; +import org.smartboot.mqtt.common.message.MqttSubscribeMessage; +import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; import org.smartboot.mqtt.common.protocol.MqttProtocol; -import org.smartboot.mqtt.common.to.MetricItemTO; import org.smartboot.mqtt.common.util.MqttMessageBuilders; import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.buffer.BufferPagePool; import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider; -import org.smartboot.socket.extension.plugins.AbstractPlugin; import org.smartboot.socket.transport.AioQuickServer; -import org.smartboot.socket.transport.AioSession; import org.yaml.snakeyaml.Yaml; import java.io.IOException; @@ -69,6 +86,7 @@ import java.util.Properties; import java.util.ServiceLoader; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -93,6 +111,9 @@ public class BrokerContextImpl implements BrokerContext { */ private final ConcurrentMap topicMap = new ConcurrentHashMap<>(); private BrokerConfigure brokerConfigure = new BrokerConfigure(); + private final TopicPublishTree topicPublishTree = new TopicPublishTree(); + + private final TopicSubscribeTree subscribeTopicTree = new TopicSubscribeTree(); /** * Keep-Alive监听线程 */ @@ -118,12 +139,25 @@ public class BrokerContextImpl implements BrokerContext { private String configJson; private final static BrokerTopic SHUTDOWN_TOPIC = new BrokerTopic(""); - /** - * 统计指标 - */ - private final Map metricMap = new HashMap<>(); private AsynchronousChannelGroup asynchronousChannelGroup; + private final Map, MqttProcessor> processors; + + { + Map, MqttProcessor> mqttProcessors = new HashMap<>(); + mqttProcessors.put(MqttPingReqMessage.class, new PingReqProcessor()); + mqttProcessors.put(MqttConnectMessage.class, new ConnectProcessor()); + mqttProcessors.put(MqttPublishMessage.class, new PublishProcessor()); + mqttProcessors.put(MqttSubscribeMessage.class, new SubscribeProcessor()); + mqttProcessors.put(MqttUnsubscribeMessage.class, new UnSubscribeProcessor()); + mqttProcessors.put(MqttPubAckMessage.class, new MqttAckProcessor<>()); + mqttProcessors.put(MqttPubRelMessage.class, new PubRelProcessor()); + mqttProcessors.put(MqttPubRecMessage.class, new MqttAckProcessor<>()); + mqttProcessors.put(MqttPubCompMessage.class, new MqttAckProcessor<>()); + mqttProcessors.put(MqttDisconnectMessage.class, new DisConnectProcessor()); + processors = MapUtils.unmodifiableMap(mqttProcessors); + } + @Override public void init() throws IOException { @@ -135,8 +169,6 @@ public class BrokerContextImpl implements BrokerContext { initPushThread(); - initMetric(); - loadAndInstallPlugins(); @@ -150,7 +182,8 @@ public class BrokerContextImpl implements BrokerContext { } }); pagePool = new BufferPagePool(10 * 1024 * 1024, brokerConfigure.getThreadNum(), true); - processor.addPlugin(new QosRetryPlugin()); + brokerConfigure.addPlugin(new QosRetryPlugin()); + brokerConfigure.getPlugins().forEach(processor::addPlugin); server = new AioQuickServer(brokerConfigure.getHost(), brokerConfigure.getPort(), new MqttProtocol(brokerConfigure.getMaxPacketSize()), processor); server.setBannerEnabled(false).setReadBufferSize(brokerConfigure.getBufferSize()).setWriteBuffer(brokerConfigure.getBufferSize(), Math.min(brokerConfigure.getMaxInflight(), 16)).setBufferPagePool(pagePool).setThreadNum(Math.max(2, brokerConfigure.getThreadNum())); server.start(asynchronousChannelGroup); @@ -182,69 +215,6 @@ public class BrokerContextImpl implements BrokerContext { configJson = null; } - private void initMetric() { - for (MqttMetricEnum metricEnum : MqttMetricEnum.values()) { - metricMap.put(metricEnum, new MetricItemTO(metricEnum)); - } - - processor.addPlugin(new AbstractPlugin() { - @Override - public void afterRead(AioSession session, int readSize) { - if (readSize > 0) { - metricMap.get(MqttMetricEnum.BYTES_RECEIVED).getMetric().add(readSize); - } - } - - @Override - public void afterWrite(AioSession session, int writeSize) { - if (writeSize > 0) { - metricMap.get(MqttMetricEnum.BYTES_SENT).getMetric().add(writeSize); - } - } - }); - eventBus.subscribe(ServerEventType.CONNECT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_CONNECT).getMetric().increment()); - eventBus.subscribe(ServerEventType.DISCONNECT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_DISCONNECT).getMetric().increment()); - eventBus.subscribe(ServerEventType.SUBSCRIBE_ACCEPT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_SUBSCRIBE).getMetric().increment()); - eventBus.subscribe(ServerEventType.UNSUBSCRIBE_ACCEPT, (eventType, object) -> metricMap.get(MqttMetricEnum.CLIENT_UNSUBSCRIBE).getMetric().increment()); - eventBus.subscribe(EventType.RECEIVE_MESSAGE, (eventType, object) -> { - metricMap.get(MqttMetricEnum.PACKETS_RECEIVED).getMetric().increment(); - if (object.getObject() instanceof MqttConnectMessage) { - metricMap.get(MqttMetricEnum.PACKETS_CONNECT_RECEIVED).getMetric().increment(); - } - }); - eventBus.subscribe(EventType.WRITE_MESSAGE, (eventType, object) -> { - metricMap.get(MqttMetricEnum.PACKETS_SENT).getMetric().increment(); - if (object.getObject() instanceof MqttConnAckMessage) { - metricMap.get(MqttMetricEnum.PACKETS_CONNACK_SENT).getMetric().increment(); - } else if (object.getObject() instanceof MqttPublishMessage) { - switch (object.getObject().getFixedHeader().getQosLevel()) { - case AT_MOST_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS0_SENT).getMetric().increment(); - break; - case AT_LEAST_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS1_SENT).getMetric().increment(); - break; - case EXACTLY_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS2_SENT).getMetric().increment(); - break; - } - } - }); - messageBusSubscriber.consumer((brokerContext1, publishMessage) -> { - switch (publishMessage.getFixedHeader().getQosLevel()) { - case AT_MOST_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS0_RECEIVED).getMetric().increment(); - break; - case AT_LEAST_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS1_RECEIVED).getMetric().increment(); - break; - case EXACTLY_ONCE: - metricMap.get(MqttMetricEnum.MESSAGE_QOS2_RECEIVED).getMetric().increment(); - break; - } - - }); - } private final TopicSubscriber BREAK = new TopicSubscriber(null, null, null, 0, 0); @@ -344,6 +314,8 @@ public class BrokerContextImpl implements BrokerContext { notifyPush(brokerTopic); }); + eventBus.subscribe(ServerEventType.NOTIFY_TOPIC_PUSH, (eventType, object) -> notifyPush(object)); + //一个新的订阅建立时,对每个匹配的主题名,如果存在最近保留的消息,它必须被发送给这个订阅者 eventBus.subscribe(ServerEventType.SUBSCRIBE_TOPIC, new EventBusSubscriber() { @Override @@ -357,12 +329,6 @@ public class BrokerContextImpl implements BrokerContext { BrokerTopic topic = subscriber.getTopic(); topic.getQueue().offer(subscriber); notifyPush(topic); -// -// int preVersion = subscriber.getTopic().getVersion().get(); -// subscriber.batchPublish(BrokerContextImpl.this); -// if (preVersion != subscriber.getTopic().getVersion().get()) { -// notifyPush(subscriber.getTopic()); -// } return; } //retain采用严格顺序publish模式 @@ -382,7 +348,8 @@ public class BrokerContextImpl implements BrokerContext { } InflightQueue inflightQueue = session.getInflightQueue(); // retain消息逐个推送 - inflightQueue.offer(publishBuilder, (mqtt) -> { + CompletableFuture> future = inflightQueue.offer(publishBuilder); + future.whenComplete((mqttPacketIdentifierMessage, throwable) -> { LOGGER.info("publish retain to client:{} success ", session.getClientId()); subscriber.setRetainConsumerOffset(offset + 1); retainPushThreadPool.execute(task); @@ -392,10 +359,13 @@ public class BrokerContextImpl implements BrokerContext { }); } }); - eventBus.subscribe(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, (eventType, subscriber) -> { - LOGGER.info("刷新订阅关系, {} 订阅了topic: {}", subscriber.getTopicFilterToken().getTopicFilter(), subscriber.getTopic().getTopic()); - subscriber.getTopic().getQueue().offer(subscriber); - }); + + eventBus.subscribe(ServerEventType.TOPIC_CREATE, (eventType, object) -> subscribeTopicTree.match(object.getTopicToken(), (session, topicFilterSubscriber) -> { + if (!providers.getSubscribeProvider().subscribeTopic(object.getTopic(), session)) { + return; + } + session.subscribeSuccess(topicFilterSubscriber.getMqttQoS(), topicFilterSubscriber.getTopicFilterToken(), object); + })); } private void notifyPush(BrokerTopic topic) { @@ -478,9 +448,8 @@ public class BrokerContextImpl implements BrokerContext { public BrokerTopic getOrCreateTopic(String topic) { return topicMap.computeIfAbsent(topic, topicName -> { ValidateUtils.isTrue(!MqttUtil.containsTopicWildcards(topicName), "invalid topicName: " + topicName); - BrokerTopic newTopic = new BrokerTopic(topicName); + BrokerTopic newTopic = topicPublishTree.addTopic(topic); eventBus.publish(ServerEventType.TOPIC_CREATE, newTopic); - metric(MqttMetricEnum.TOPIC_COUNT).getMetric().increment(); return newTopic; }); } @@ -548,6 +517,16 @@ public class BrokerContextImpl implements BrokerContext { } } + @Override + public TopicPublishTree getPublishTopicTree() { + return topicPublishTree; + } + + @Override + public TopicSubscribeTree getTopicSubscribeTree() { + return subscribeTopicTree; + } + public void loadYamlConfig() throws IOException { String brokerConfig = System.getProperty(BrokerConfigure.SystemProperty.BrokerConfig); InputStream inputStream = null; @@ -567,16 +546,13 @@ public class BrokerContextImpl implements BrokerContext { } } - @Override - public MqttBrokerMessageProcessor getMessageProcessor() { - return processor; - } @Override - public MetricItemTO metric(MqttMetricEnum metricEnum) { - return metricMap.get(metricEnum); + public Map, MqttProcessor> getMessageProcessors() { + return processors; } + @Override public void destroy() { LOGGER.info("destroy broker..."); diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java index d781f8b17304b6376ca2c5688daf55f04937d2f6..8bfae4a5e96dc115d504cab97b713ddb57ca1fc9 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttBrokerMessageProcessor.java @@ -12,40 +12,19 @@ package org.smartboot.mqtt.broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.broker.processor.ConnectProcessor; -import org.smartboot.mqtt.broker.processor.DisConnectProcessor; -import org.smartboot.mqtt.broker.processor.MqttAckProcessor; import org.smartboot.mqtt.broker.processor.MqttProcessor; -import org.smartboot.mqtt.broker.processor.PingReqProcessor; -import org.smartboot.mqtt.broker.processor.PubRelProcessor; -import org.smartboot.mqtt.broker.processor.PublishProcessor; -import org.smartboot.mqtt.broker.processor.SubscribeProcessor; -import org.smartboot.mqtt.broker.processor.UnSubscribeProcessor; import org.smartboot.mqtt.common.DefaultMqttWriter; -import org.smartboot.mqtt.common.enums.MqttMetricEnum; import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.exception.MqttException; -import org.smartboot.mqtt.common.message.MqttConnectMessage; -import org.smartboot.mqtt.common.message.MqttDisconnectMessage; import org.smartboot.mqtt.common.message.MqttMessage; -import org.smartboot.mqtt.common.message.MqttPingReqMessage; -import org.smartboot.mqtt.common.message.MqttPubAckMessage; -import org.smartboot.mqtt.common.message.MqttPubCompMessage; -import org.smartboot.mqtt.common.message.MqttPubRecMessage; -import org.smartboot.mqtt.common.message.MqttPubRelMessage; -import org.smartboot.mqtt.common.message.MqttPublishMessage; -import org.smartboot.mqtt.common.message.MqttSubscribeMessage; -import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; +import org.smartboot.mqtt.common.util.MqttAttachKey; import org.smartboot.socket.StateMachineEnum; import org.smartboot.socket.extension.processor.AbstractMessageProcessor; import org.smartboot.socket.transport.AioSession; +import org.smartboot.socket.util.AttachKey; import org.smartboot.socket.util.Attachment; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * @author 三刀 * @version V1.0 , 2018/4/24 @@ -56,25 +35,9 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor onlineSessions = new ConcurrentHashMap<>(); - private final Map, MqttProcessor> processorMap = new HashMap<>(); - { - processorMap.put(MqttPingReqMessage.class, new PingReqProcessor()); - processorMap.put(MqttConnectMessage.class, new ConnectProcessor()); - processorMap.put(MqttPublishMessage.class, new PublishProcessor()); - processorMap.put(MqttSubscribeMessage.class, new SubscribeProcessor()); - processorMap.put(MqttUnsubscribeMessage.class, new UnSubscribeProcessor()); - processorMap.put(MqttPubAckMessage.class, new MqttAckProcessor<>()); - processorMap.put(MqttPubRelMessage.class, new PubRelProcessor()); - processorMap.put(MqttPubRecMessage.class, new MqttAckProcessor<>()); - processorMap.put(MqttPubCompMessage.class, new MqttAckProcessor<>()); - processorMap.put(MqttDisconnectMessage.class, new DisConnectProcessor()); -// addPlugin(new RateLimiterPlugin<>(1024 * 512, 1024 * 512)); - } + private final static AttachKey SESSION_KEY = AttachKey.valueOf(MqttAttachKey.MQTT_SESSION); + public MqttBrokerMessageProcessor(BrokerContext mqttContext) { this.mqttContext = mqttContext; @@ -82,9 +45,10 @@ public class MqttBrokerMessageProcessor extends AbstractMessageProcessor getOnlineSessions() { - return onlineSessions; - } - } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java index 713295a415134565879b025863aa275f18a1484e..02a990fcbdeeb3226bbf681b34cd86a7a5a22ae1 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/MqttSession.java @@ -18,11 +18,9 @@ import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.MqttWriter; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; -import org.smartboot.mqtt.common.eventbus.EventBusSubscriber; import org.smartboot.mqtt.common.eventbus.EventType; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties; -import org.smartboot.mqtt.common.util.MqttUtil; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.transport.AioSession; @@ -134,71 +132,49 @@ public class MqttSession extends AbstractSession { public MqttQoS subscribe(String topicFilter, MqttQoS mqttQoS) { if (mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topicFilter, this)) { - subscribe0(topicFilter, mqttQoS, true); + subscribe0(topicFilter, mqttQoS); return mqttQoS; } else { return MqttQoS.FAILURE; } } - private void subscribe0(String topicFilter, MqttQoS mqttQoS, boolean newSubscribe) { + private void subscribe0(String topicFilter, MqttQoS mqttQoS) { + TopicFilterSubscriber subscriber = subscribers.get(topicFilter); + if (subscriber != null) { + subscriber.setMqttQoS(mqttQoS); + subscriber.getTopicSubscribers().values().forEach(sub -> sub.setMqttQoS(mqttQoS)); + return; + } TopicToken topicToken = new TopicToken(topicFilter); - //精准匹配 if (!topicToken.isWildcards()) { - BrokerTopic topic = mqttContext.getOrCreateTopic(topicToken.getTopicFilter());//可能会先触发TopicFilterSubscriber.subscribe - TopicSubscriber subscription = subscribeSuccess(mqttQoS, topicToken, topic); - if (newSubscribe) { - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); - } else { - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, subscription); - } - return; + mqttContext.getOrCreateTopic(topicFilter); } - - //通配符匹配存量Topic - for (BrokerTopic topic : mqttContext.getTopics()) { - if (MqttUtil.match(topic.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(topic.getTopic(), this)) { - TopicSubscriber subscription = subscribeSuccess(mqttQoS, topicToken, topic); - if (newSubscribe) { - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); - } else { - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, subscription); + subscriber = new TopicFilterSubscriber(topicToken, mqttQoS); + TopicFilterSubscriber preSubscriber = subscribers.put(topicFilter, subscriber); + ValidateUtils.isTrue(preSubscriber == null, "duplicate topic filter"); + mqttContext.getTopicSubscribeTree().subscribeTopic(this, subscriber); + mqttContext.getPublishTopicTree().match(topicToken, topic -> subscribeSuccess(mqttQoS, topicToken, topic)); + } + + public void subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) { + TopicSubscriber topicSubscriber = topic.getConsumeOffsets().get(this); + if (topicSubscriber != null) { + //此前的订阅关系 + TopicToken preToken = topicSubscriber.getTopicFilterToken(); + if (preToken.isWildcards()) { + if (!topicToken.isWildcards() || topicToken.getTopicFilter().length() > preToken.getTopicFilter().length()) { + //解除旧的订阅关系 + TopicSubscriber preSubscription = subscribers.get(preToken.getTopicFilter()).getTopicSubscribers().remove(topic); + preSubscription.setMqttQoS(mqttQoS); + preSubscription.setTopicFilterToken(topicToken); + //绑定新的订阅关系 + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, preSubscription); + mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_REFRESH_TOPIC, preSubscription); } } + return; } - - //通配符匹配增量Topic - if (!subscribers.containsKey(topicFilter)) { - subscribers.put(topicFilter, new TopicFilterSubscriber(topicToken, mqttQoS)); - } - if (newSubscribe) { - mqttContext.getEventBus().subscribe(ServerEventType.TOPIC_CREATE, new EventBusSubscriber() { - @Override - public boolean enable() { - boolean enable = !disconnect && subscribers.containsKey(topicFilter); - if (!enable) { - LOGGER.info("current event is disable,quit topic:{} monitor", topicFilter); - } - return enable; - } - - @Override - public void subscribe(EventType eventType, BrokerTopic object) { - if (MqttUtil.match(object.getTopicToken(), topicToken) && mqttContext.getProviders().getSubscribeProvider().subscribeTopic(object.getTopic(), MqttSession.this)) { - TopicSubscriber subscription = MqttSession.this.subscribeSuccess(mqttQoS, topicToken, object); - mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); - } - } - }); - } - } - - /** - * retain消息消费点位记录 - */ - private final Map retainOffsetCache = new HashMap<>(); - - private TopicSubscriber subscribeSuccess(MqttQoS mqttQoS, TopicToken topicToken, BrokerTopic topic) { long latestOffset = mqttContext.getProviders().getPersistenceProvider().getLatestOffset(topic.getTopic()); // retain消费点位优先以缓存为准 Long retainOffset = retainOffsetCache.get(topic); @@ -207,43 +183,23 @@ public class MqttSession extends AbstractSession { retainOffset = oldestRetainOffset; } //以当前消息队列的最新点位为起始点位 - TopicSubscriber subscription = new TopicSubscriber(topic, this, mqttQoS, latestOffset + 1, retainOffset); + TopicSubscriber subscription = new TopicSubscriber(topic, MqttSession.this, mqttQoS, latestOffset + 1, retainOffset); subscription.setTopicFilterToken(topicToken); - // 如果服务端收到一个 SUBSCRIBE 报文, - //报文的主题过滤器与一个现存订阅的主题过滤器相同, - // 那么必须使用新的订阅彻底替换现存的订阅。 - // 新订阅的主题过滤器和之前订阅的相同,但是它的最大 QoS 值可以不同。 - ValidateUtils.isTrue(!disconnect, "session has closed,can not subscribe topic"); - - subscribers.values().forEach(topicFilterSubscriber -> { - TopicSubscriber oldOffset = topicFilterSubscriber.getTopicSubscribers().remove(topic.getTopic()); - if (oldOffset != null) { - TopicSubscriber consumerOffset = oldOffset.getTopic().getConsumeOffsets().remove(this); - consumerOffset.disable(); - LOGGER.info("remove topic:{} {},", topic, oldOffset == consumerOffset ? "success" : "fail"); - } - }); - - TopicFilterSubscriber topicFilterSubscriber = subscribers.get(subscription.getTopicFilterToken().getTopicFilter()); - if (topicFilterSubscriber == null) { - topicFilterSubscriber = new TopicFilterSubscriber(subscription.getTopicFilterToken(), subscription.getMqttQoS(), subscription); - subscribers.put(subscription.getTopicFilterToken().getTopicFilter(), topicFilterSubscriber); - } else { - topicFilterSubscriber.getTopicSubscribers().put(subscription.getTopic().getTopic(), subscription); - } - TopicSubscriber preTopicSubscriber = subscription.getTopic().getConsumeOffsets().put(this, subscription); - if (preTopicSubscriber != null) { - LOGGER.error("invalid state..."); - } else { - LOGGER.debug("new subscribe topic:{} success by topicFilter:{}", subscription.getTopic().getTopic(), subscription.getTopicFilterToken().getTopicFilter()); - } - - return subscription; + topic.getConsumeOffsets().put(MqttSession.this, subscription); + subscribers.get(topicToken.getTopicFilter()).getTopicSubscribers().put(topic, subscription); + mqttContext.getEventBus().publish(ServerEventType.SUBSCRIBE_TOPIC, subscription); } + /** + * retain消息消费点位记录 + */ + private final Map retainOffsetCache = new HashMap<>(); + public void resubscribe() { - subscribers.values().forEach(subscriber -> subscribe0(subscriber.getTopicFilterToken().getTopicFilter(), subscriber.getMqttQoS(), false)); + subscribers.values().stream().filter(subscriber -> subscriber.getTopicFilterToken().isWildcards()).forEach(subscriber -> { + mqttContext.getPublishTopicTree().match(subscriber.getTopicFilterToken(), topic -> subscribeSuccess(subscriber.getMqttQoS(), subscriber.getTopicFilterToken(), topic)); + }); } public void unsubscribe(String topicFilter) { @@ -261,6 +217,7 @@ public class MqttSession extends AbstractSession { LOGGER.error("remove subscriber:{} error!", removeSubscriber); } }); + mqttContext.getTopicSubscribeTree().unsubscribe(this, filterSubscriber); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java index 75f1691f99be09998d488d5d01a2afbcad315a88..7047f2ef7e7f347f27172dff05d6ef9ed41dc540 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicFilterSubscriber.java @@ -22,25 +22,19 @@ import java.util.Map; * @author 三刀(zhengjunweimail@163.com) * @version V1.0 , 2022/7/13 */ -class TopicFilterSubscriber { +public class TopicFilterSubscriber { private final TopicToken topicFilterToken; - private final MqttQoS mqttQoS; + private MqttQoS mqttQoS; /** * 客户端订阅所匹配的Topic。通配符订阅时可能有多个 */ - private final Map topicSubscribers; + private final Map topicSubscribers = new HashMap<>(); - public TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) { + TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS) { this.topicFilterToken = topicFilterToken; this.mqttQoS = mqttQoS; - this.topicSubscribers = new HashMap<>(); - } - - public TopicFilterSubscriber(TopicToken topicFilterToken, MqttQoS mqttQoS, TopicSubscriber topicSubscriber) { - this(topicFilterToken, mqttQoS); - topicSubscribers.put(topicSubscriber.getTopic().getTopic(), topicSubscriber); } public TopicToken getTopicFilterToken() { @@ -51,7 +45,11 @@ class TopicFilterSubscriber { return mqttQoS; } - public Map getTopicSubscribers() { + public void setMqttQoS(MqttQoS mqttQoS) { + this.mqttQoS = mqttQoS; + } + + public Map getTopicSubscribers() { return topicSubscribers; } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java index 5c1ed6c83645f2ecfe56ada25c78fb182a07bd90..09378fd580f6cd43bb63f8716df9b1eb2d20d362 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/TopicSubscriber.java @@ -12,17 +12,20 @@ package org.smartboot.mqtt.broker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.eventbus.ServerEventType; import org.smartboot.mqtt.broker.provider.PersistenceProvider; import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage; -import org.smartboot.mqtt.common.InflightMessage; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.TopicToken; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.eventbus.EventType; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; import org.smartboot.mqtt.common.util.MqttMessageBuilders; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; /** @@ -41,7 +44,7 @@ public class TopicSubscriber { /** * 服务端向客户端发送应用消息所允许的最大 QoS 等级 */ - private final MqttQoS mqttQoS; + private MqttQoS mqttQoS; /** * 期望消费的点位 @@ -76,25 +79,27 @@ public class TopicSubscriber { return; } semaphore.release(); - publish0(brokerContext, 0); + int i = 16; + while (publishAvailable(brokerContext)) { + if (i-- == 0) { + if (semaphore.tryAcquire()) { + topic.getQueue().offer(this); + topic.getVersion().incrementAndGet(); + } + break; + } + } mqttSession.flush(); } - private void publish0(BrokerContext brokerContext, final int depth) { + private boolean publishAvailable(BrokerContext brokerContext) { PersistenceProvider persistenceProvider = brokerContext.getProviders().getPersistenceProvider(); PersistenceMessage persistenceMessage = persistenceProvider.get(topic.getTopic(), nextConsumerOffset); if (persistenceMessage == null) { if (semaphore.tryAcquire()) { topic.getQueue().offer(this); } - return; - } - if (depth > 16) { - if (semaphore.tryAcquire()) { - topic.getQueue().offer(this); - topic.getVersion().incrementAndGet(); - } - return; + return false; } MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(persistenceMessage.getPayload()).qos(mqttQoS).topicName(persistenceMessage.getTopic()); @@ -109,38 +114,30 @@ public class TopicSubscriber { //Qos0直接发送 if (mqttQoS == MqttQoS.AT_MOST_ONCE) { mqttSession.write(publishBuilder.build(), false); - publish0(brokerContext, depth + 1); - return; - } - InflightMessage suc; - if (depth == 0) { - suc = inflightQueue.offer(publishBuilder, (mqtt) -> { - //最早发送的消息若收到响应,则更新点位 - commitNextConsumerOffset(offset + 1); - if (persistenceMessage.isRetained()) { - setRetainConsumerOffset(getRetainConsumerOffset() + 1); - } - commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); - publish0(brokerContext, 1); - }, () -> publish0(brokerContext, 0)); - } else { - suc = inflightQueue.offer(publishBuilder, (mqtt) -> { - //最早发送的消息若收到响应,则更新点位 - commitNextConsumerOffset(offset + 1); - if (persistenceMessage.isRetained()) { - setRetainConsumerOffset(getRetainConsumerOffset() + 1); - } - commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); - publish0(brokerContext, 1); - }); + return true; } - // 飞行队列已满 - if (suc != null) { - //递归处理下一个消息 - publish0(brokerContext, depth + 1); + CompletableFuture> future = inflightQueue.offer(publishBuilder, () -> { + if (semaphore.tryAcquire()) { + topic.getQueue().offer(this); + topic.getVersion().incrementAndGet(); + } + brokerContext.getEventBus().publish(ServerEventType.NOTIFY_TOPIC_PUSH, topic); + }); + if (future == null) { + return false; } + future.whenComplete((mqttPacketIdentifierMessage, throwable) -> { + //最早发送的消息若收到响应,则更新点位 + commitNextConsumerOffset(offset + 1); + if (persistenceMessage.isRetained()) { + setRetainConsumerOffset(getRetainConsumerOffset() + 1); + } + commitRetainConsumerTimestamp(persistenceMessage.getCreateTime()); + publishAvailable(brokerContext); + }); + return true; } public BrokerTopic getTopic() { @@ -191,4 +188,8 @@ public class TopicSubscriber { public void disable() { this.enable = false; } + + public void setMqttQoS(MqttQoS mqttQoS) { + this.mqttQoS = mqttQoS; + } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java index d4bf3ef3995a72429dca8b3acd6e44796056950b..4fcc38d3f25d4282dca10c91fd0dfa27021ac959 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/eventbus/ServerEventType.java @@ -86,6 +86,8 @@ public class ServerEventType extends EventType { */ public static final ServerEventType> CONNACK = new ServerEventType<>("connect"); + public static final ServerEventType NOTIFY_TOPIC_PUSH = new ServerEventType<>("notify_topic_push"); + protected ServerEventType(String name) { super(name); } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java index a58408d7b3d1f1d3ef5cde265c50beb1450a317b..447b3bc9239f1cdaf6659bfb070f0bdd511a0e70 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/openapi/controller/DashBoardController.java @@ -20,19 +20,12 @@ import org.smartboot.http.restful.annotation.RequestMapping; import org.smartboot.mqtt.broker.BrokerConfigure; import org.smartboot.mqtt.broker.BrokerContext; import org.smartboot.mqtt.broker.BrokerRuntime; -import org.smartboot.mqtt.broker.MqttSession; import org.smartboot.mqtt.broker.openapi.OpenApi; import org.smartboot.mqtt.broker.openapi.enums.BrokerStatueEnum; import org.smartboot.mqtt.broker.openapi.to.BrokerNodeTO; -import org.smartboot.mqtt.common.enums.MqttMetricEnum; -import org.smartboot.mqtt.common.to.MetricItemTO; -import org.smartboot.mqtt.common.to.MetricTO; import java.lang.management.ManagementFactory; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Date; import java.util.List; /** @@ -52,30 +45,8 @@ public class DashBoardController { } @RequestMapping(OpenApi.DASHBOARD_OVERVIEW) - public RestResult overview() { - MetricTO metricTO = new MetricTO(); - Collection sessions = brokerContext.getSessions(); - Date date = new Date(); - //在线客户端数量 - MetricItemTO online = brokerContext.metric(MqttMetricEnum.CLIENT_ONLINE); - online.setTime(date); - metricTO.getMetric().put(MqttMetricEnum.CLIENT_ONLINE.getCode(), online); - - //主题数 - MetricItemTO topicCount = brokerContext.metric(MqttMetricEnum.TOPIC_COUNT); - topicCount.setTime(date); - metricTO.getMetric().put(MqttMetricEnum.TOPIC_COUNT.getCode(), topicCount); - - int subCount = 0; - for (MqttSession session : sessions) { - subCount += session.getSubscribers().size(); - } - MetricItemTO subscribeTopicCount = new MetricItemTO(); - subscribeTopicCount.setCode("subscribe_topic_count"); - subscribeTopicCount.setValue(subCount); - metricTO.getMetric().put(subscribeTopicCount.getCode(), subscribeTopicCount); - - return RestResult.ok(metricTO); + public RestResult overview() { + return RestResult.fail(OpenApi.MESSAGE_UPGRADE); } @RequestMapping(OpenApi.DASHBOARD_NODES) @@ -117,52 +88,8 @@ public class DashBoardController { * @return */ @RequestMapping(OpenApi.DASHBOARD_METRICS) - public RestResult metrics() { - MetricTO metricTO = new MetricTO(); - //连接 - List connectionGroup = new ArrayList<>(); - metricTO.getGroup().put("connection", connectionGroup); - connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_CONNECT)); - connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_DISCONNECT)); - connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_SUBSCRIBE)); - connectionGroup.add(brokerContext.metric(MqttMetricEnum.CLIENT_UNSUBSCRIBE)); - - //会话 - List sessionGroup = new ArrayList<>(); - metricTO.getGroup().put("session", sessionGroup); - //认证与权限 - List accessGroup = new ArrayList<>(); - metricTO.getGroup().put("access", accessGroup); - - //流量收发 - List bytesGroup = new ArrayList<>(); - metricTO.getGroup().put("bytes", bytesGroup); - - //报文 - List packetGroup = new ArrayList<>(); - metricTO.getGroup().put("packet", packetGroup); - packetGroup.add(brokerContext.metric(MqttMetricEnum.BYTES_RECEIVED)); - packetGroup.add(brokerContext.metric(MqttMetricEnum.BYTES_SENT)); - packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_RECEIVED)); - packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_SENT)); - packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_CONNECT_RECEIVED)); - packetGroup.add(brokerContext.metric(MqttMetricEnum.PACKETS_CONNACK_SENT)); - - - //消息数量 - List messageGroup = new ArrayList<>(); - metricTO.getGroup().put("message", messageGroup); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS0_RECEIVED)); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS1_RECEIVED)); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS2_RECEIVED)); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS0_SENT)); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS1_SENT)); - messageGroup.add(brokerContext.metric(MqttMetricEnum.MESSAGE_QOS2_SENT)); - - //消息分发 - List deliveryGroup = new ArrayList<>(); - metricTO.getGroup().put("delivery", deliveryGroup); - return RestResult.ok(metricTO); + public RestResult metrics() { + return RestResult.fail(OpenApi.MESSAGE_UPGRADE); } } diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java index b6c9f38ee967883509e25e29d29232c1837a0648..fa281f9c6291f01fdb82a676a4436c9e33fe2246 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/processor/SubscribeProcessor.java @@ -60,6 +60,6 @@ public class SubscribeProcessor extends AuthorizedMqttProcessor subNode = new ConcurrentHashMap<>(); + + public BrokerTopic addTopic(String topic) { + BrokerTopic brokerTopic = new BrokerTopic(topic); + TopicToken topicToken = brokerTopic.getTopicToken(); + TopicPublishTree treeNode = this; + while (true) { + treeNode = treeNode.subNode.computeIfAbsent(topicToken.getNode(), n -> new TopicPublishTree()); + if (topicToken.getNextNode() == null) { + break; + } else { + topicToken = topicToken.getNextNode(); + } + } + treeNode.brokerTopic = new BrokerTopic(topic); + return treeNode.brokerTopic; + } + + public void match(TopicToken topicToken, Consumer consumer) { + match(this, topicToken, consumer); + } + + private void match(TopicPublishTree treeNode, TopicToken topicToken, Consumer consumer) { + //匹配结束 + if (topicToken == null) { + if (treeNode.brokerTopic != null) { + consumer.accept(treeNode.brokerTopic); + } + return; + } + //合法的#通配符必然存在于末端 + if ("#".equals(topicToken.getNode())) { + treeNode.subNode.values().forEach(node -> { + subscribeChildren(node, consumer); + }); + } else if ("+".equals(topicToken.getNode())) { + treeNode.subNode.values().forEach(node -> { + match(node, topicToken.getNextNode(), consumer); + }); + } else { + TopicPublishTree node = treeNode.subNode.get(topicToken.getNode()); + if (node != null) { + match(node, topicToken.getNextNode(), consumer); + } + } + } + + private void subscribeChildren(TopicPublishTree treeNode, Consumer consumer) { + BrokerTopic brokerTopic = treeNode.brokerTopic; + if (brokerTopic != null) { + consumer.accept(brokerTopic); + } + //递归订阅Topic + treeNode.subNode.values().forEach(subNode -> subscribeChildren(subNode, consumer)); + } +} diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java new file mode 100644 index 0000000000000000000000000000000000000000..1ab0df22f8732e3756f44a638c20456bc5220e68 --- /dev/null +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicSubscribeTree.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] + * + * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 + * + * Enterprise users are required to use this project reasonably + * and legally in accordance with the AGPL-3.0 open source agreement + * without special permission from the smartboot organization. + */ + +package org.smartboot.mqtt.broker.topic; + +import org.smartboot.mqtt.broker.MqttSession; +import org.smartboot.mqtt.broker.TopicFilterSubscriber; +import org.smartboot.mqtt.common.TopicToken; +import org.smartboot.mqtt.common.util.ValidateUtils; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; + +/** + * @author 三刀(zhengjunweimail@163.com) + * @version V1.0 , 5/28/23 + */ +public class TopicSubscribeTree { + private final Map subscribers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap subNode = new ConcurrentHashMap<>(); + + public void subscribeTopic(MqttSession session, TopicFilterSubscriber subscriber) { + TopicSubscribeTree treeNode = this; + TopicToken token = subscriber.getTopicFilterToken(); + do { + treeNode = treeNode.subNode.computeIfAbsent(token.getNode(), n -> new TopicSubscribeTree()); + } while ((token = token.getNextNode()) != null); + treeNode.subscribers.put(session, subscriber); + } + + public void unsubscribe(MqttSession session, TopicFilterSubscriber subscriber) { + TopicSubscribeTree subscribeTree = this; + TopicToken topicToken = subscriber.getTopicFilterToken(); + while (true) { + subscribeTree = subscribeTree.subNode.get(topicToken.getNode()); + if (topicToken.getNextNode() == null) { + break; + } + topicToken = topicToken.getNextNode(); + } + subscribeTree.subscribers.remove(session); + } + + + public void match(TopicToken topicToken, BiConsumer consumer) { + //精确匹配 + TopicSubscribeTree subscribeTree = subNode.get(topicToken.getNode()); + if (subscribeTree != null) { + if (topicToken.getNextNode() == null) { + subscribers.forEach(consumer); + } else { + subscribeTree.match(topicToken.getNextNode(), consumer); + } + } + subscribeTree = subNode.get("#"); + if (subscribeTree != null) { + ValidateUtils.isTrue(subscribeTree.subNode.isEmpty(), "'#' node must be empty"); + subscribeTree.subscribers.forEach(consumer); + } + + subscribeTree = subNode.get("+"); + if (subscribeTree != null) { + if (topicToken.getNextNode() == null) { + subscribers.forEach(consumer); + } else { + subscribeTree.subNode.values().forEach(t -> match(topicToken.getNextNode(), consumer)); + } + } + } +} diff --git a/smart-mqtt-client/pom.xml b/smart-mqtt-client/pom.xml index b93a632bcb280a36d0d58843226da751386bd6d9..aa4dfbf9cd12a4aad0506a89a2763d9f4ebb1af4 100644 --- a/smart-mqtt-client/pom.xml +++ b/smart-mqtt-client/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.20 + 0.21 ../pom.xml 4.0.0 @@ -27,5 +27,10 @@ junit test + + org.smartboot.mqtt + smart-mqtt-broker + test + \ No newline at end of file diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java index be39b3cfe0f5cb4a1f5df3d096f78a6d08660a50..c4160ddb4f2409b70d23c0d6f68f0687f21aaac6 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClient.java @@ -15,7 +15,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.AbstractSession; import org.smartboot.mqtt.common.DefaultMqttWriter; -import org.smartboot.mqtt.common.InflightMessage; import org.smartboot.mqtt.common.InflightQueue; import org.smartboot.mqtt.common.QosRetryPlugin; import org.smartboot.mqtt.common.TopicToken; @@ -28,6 +27,7 @@ import org.smartboot.mqtt.common.message.MqttConnAckMessage; import org.smartboot.mqtt.common.message.MqttConnectMessage; import org.smartboot.mqtt.common.message.MqttDisconnectMessage; import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPingReqMessage; import org.smartboot.mqtt.common.message.MqttPingRespMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; @@ -38,6 +38,7 @@ import org.smartboot.mqtt.common.message.MqttUnsubAckMessage; import org.smartboot.mqtt.common.message.payload.MqttConnectPayload; import org.smartboot.mqtt.common.message.payload.WillMessage; import org.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader; +import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ConnectProperties; import org.smartboot.mqtt.common.message.variable.properties.PublishProperties; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; @@ -60,6 +61,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadFactory; @@ -75,7 +77,12 @@ public class MqttClient extends AbstractSession { * 客户端配置项 */ private final MqttClientConfigure clientConfigure = new MqttClientConfigure(); - private final AbstractMessageProcessor messageProcessor = new MqttClientProcessor(this); + private static final AbstractMessageProcessor messageProcessor = new MqttClientProcessor(); + + static { + messageProcessor.addPlugin(new QosRetryPlugin()); + } + /** * 完成connect之前注册的事件 */ @@ -85,6 +92,8 @@ public class MqttClient extends AbstractSession { */ private final Map subscribes = new ConcurrentHashMap<>(); + private final Map mapping = new ConcurrentHashMap<>(); + private final List wildcardsToken = new LinkedList<>(); private AioQuickClient client; @@ -169,11 +178,11 @@ public class MqttClient extends AbstractSession { if (mqttConnAckMessage.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { setInflightQueue(new InflightQueue(this, 16)); connected = true; - consumeTask(); //重连情况下重新触发订阅逻辑 subscribes.forEach((k, v) -> { subscribe(k, v.getQoS(), v.getConsumer()); }); + consumeTask(); } //客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发 //任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 [MQTT-4.4.0-1]。这是唯一要求客户端或 @@ -215,7 +224,7 @@ public class MqttClient extends AbstractSession { }, clientConfigure.getKeepAliveInterval(), TimeUnit.SECONDS); } // messageProcessor.addPlugin(new StreamMonitorPlugin<>()); - messageProcessor.addPlugin(new QosRetryPlugin()); + client = new AioQuickClient(clientConfigure.getHost(), clientConfigure.getPort(), new MqttProtocol(clientConfigure.getMaxPacketSize()), messageProcessor); try { if (bufferPagePool != null) { @@ -223,7 +232,9 @@ public class MqttClient extends AbstractSession { } client.setReadBufferSize(clientConfigure.getBufferSize()).setWriteBuffer(clientConfigure.getBufferSize(), 8).connectTimeout(clientConfigure.getConnectionTimeout()); session = client.start(asynchronousChannelGroup); - session.setAttachment(new Attachment()); + Attachment attachment = new Attachment(); + session.setAttachment(attachment); + attachment.put(MqttClientProcessor.SESSION_KEY, this); setMqttVersion(clientConfigure.getMqttVersion()); mqttWriter = new DefaultMqttWriter(session.writeBuffer()); @@ -299,12 +310,17 @@ public class MqttClient extends AbstractSession { unsubscribeBuilder.properties(properties); } // wait ack message. - getInflightQueue().offer(unsubscribeBuilder, (message) -> { + CompletableFuture> future = getInflightQueue().offer(unsubscribeBuilder, () -> registeredTasks.offer(() -> unsubscribe0(topics))); + if (future == null) { + return; + } + future.whenComplete((message, throwable) -> { ValidateUtils.isTrue(message instanceof MqttUnsubAckMessage, "uncorrected message type."); for (String unsubscribedTopic : unsubscribedTopics) { subscribes.remove(unsubscribedTopic); wildcardsToken.removeIf(topicToken -> StringUtils.equals(unsubscribedTopic, topicToken.getTopicFilter())); } + mapping.clear(); consumeTask(); }); flush(); @@ -343,7 +359,18 @@ public class MqttClient extends AbstractSession { subscribeBuilder.subscribeProperties(new SubscribeProperties()); } MqttSubscribeMessage subscribeMessage = subscribeBuilder.build(); - getInflightQueue().offer(subscribeBuilder, (message) -> { + + CompletableFuture> future = getInflightQueue().offer(subscribeBuilder, new Runnable() { + @Override + public void run() { + + } + }); + if (future == null) { + registeredTasks.offer(() -> subscribe0(topic, qos, consumer, subAckConsumer)); + return; + } + future.whenComplete((message, throwable) -> { List qosValues = ((MqttSubAckMessage) message).getPayload().grantedQoSLevels(); ValidateUtils.isTrue(qosValues.size() == qos.length, "invalid response"); int i = 0; @@ -360,11 +387,13 @@ public class MqttClient extends AbstractSession { } else { LOGGER.error("subscribe topic:{} fail", subscription.getTopicFilter()); } + mapping.clear(); subAckConsumer.accept(MqttClient.this, minQos); } consumeTask(); }); flush(); + } public void notifyResponse(MqttConnAckMessage connAckMessage) { @@ -425,25 +454,8 @@ public class MqttClient extends AbstractSession { consumer.accept(0); return; } - InflightQueue inflightQueue = getInflightQueue(); - InflightMessage inflightMessage = inflightQueue.offer(publishBuilder, (message) -> { - consumer.accept(message.getVariableHeader().getPacketId()); - //最早发送的消息若收到响应,则更新点位 - synchronized (MqttClient.this) { - MqttClient.this.notifyAll(); - } - }); - if (inflightMessage == null) { - try { - synchronized (this) { - wait(); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - publish(publishBuilder, consumer, autoFlush); - return; - } + CompletableFuture> future = inflightQueue.put(publishBuilder); + future.whenComplete((message, throwable) -> consumer.accept(message.getVariableHeader().getPacketId())); if (autoFlush) { flush(); } @@ -457,6 +469,10 @@ public class MqttClient extends AbstractSession { return subscribes; } + public Map getMapping() { + return mapping; + } + public List getWildcardsToken() { return wildcardsToken; } @@ -468,6 +484,9 @@ public class MqttClient extends AbstractSession { */ @Override public void disconnect() { + if (disconnect) { + return; + } //DISCONNECT 报文是客户端发给服务端的最后一个控制报文。表示客户端正常断开连接。 write(new MqttDisconnectMessage()); //关闭自动重连 diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java index b15876b6258c72556a0efc6d6447efed0427feee..e216c56340bd71d0d096e5b4f0e19f815b8473d9 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/MqttClientProcessor.java @@ -20,6 +20,7 @@ import org.smartboot.mqtt.client.processor.PubRelProcessor; import org.smartboot.mqtt.client.processor.PublishProcessor; import org.smartboot.mqtt.common.eventbus.EventObject; import org.smartboot.mqtt.common.eventbus.EventType; +import org.smartboot.mqtt.common.exception.MqttException; import org.smartboot.mqtt.common.message.MqttConnAckMessage; import org.smartboot.mqtt.common.message.MqttMessage; import org.smartboot.mqtt.common.message.MqttPingRespMessage; @@ -29,9 +30,12 @@ import org.smartboot.mqtt.common.message.MqttPubRecMessage; import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; import org.smartboot.mqtt.common.message.MqttSubAckMessage; +import org.smartboot.mqtt.common.util.MqttAttachKey; import org.smartboot.socket.StateMachineEnum; import org.smartboot.socket.extension.processor.AbstractMessageProcessor; import org.smartboot.socket.transport.AioSession; +import org.smartboot.socket.util.AttachKey; +import org.smartboot.socket.util.Attachment; import java.util.HashMap; import java.util.Map; @@ -42,7 +46,7 @@ import java.util.Map; */ public class MqttClientProcessor extends AbstractMessageProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(MqttClientProcessor.class); - private final MqttClient mqttClient; + final static AttachKey SESSION_KEY = AttachKey.valueOf(MqttAttachKey.MQTT_SESSION); private static final Map, MqttProcessor> processors = new HashMap<>(); static { @@ -52,21 +56,20 @@ public class MqttClientProcessor extends AbstractMessageProcessor { processors.put(MqttPubRecMessage.class, new MqttAckProcessor()); processors.put(MqttPubCompMessage.class, new MqttAckProcessor()); processors.put(MqttPubRelMessage.class, new PubRelProcessor()); - processors.put(MqttSubAckMessage.class, new MqttAckProcessor()); + processors.put(MqttSubAckMessage.class, new MqttAckProcessor()); processors.put(MqttPingRespMessage.class, new MqttPingRespProcessor()); } - public MqttClientProcessor(MqttClient mqttClient) { - this.mqttClient = mqttClient; - } @Override public void process0(AioSession session, MqttMessage msg) { - mqttClient.getEventBus().publish(EventType.RECEIVE_MESSAGE, EventObject.newEventObject(mqttClient, msg)); + Attachment attachment = session.getAttachment(); + MqttClient client = attachment.get(SESSION_KEY); + client.getEventBus().publish(EventType.RECEIVE_MESSAGE, EventObject.newEventObject(client, msg)); MqttProcessor processor = processors.get(msg.getClass()); // LOGGER.info("receive msg:{}", msg); if (processor != null) { - processor.process(mqttClient, msg); + processor.process(client, msg); } else { LOGGER.error("unknown msg:{}", msg); } @@ -74,9 +77,25 @@ public class MqttClientProcessor extends AbstractMessageProcessor { @Override public void stateEvent0(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) { -// System.out.println(stateMachineEnum); - if (throwable != null) { - throwable.printStackTrace(); + switch (stateMachineEnum) { + case DECODE_EXCEPTION: + LOGGER.error("decode exception", throwable); + break; + case SESSION_CLOSED: + Attachment attachment = session.getAttachment(); + attachment.get(SESSION_KEY).disconnect(); + break; + case PROCESS_EXCEPTION: + if (throwable instanceof MqttException) { + LOGGER.warn("process exception", throwable); + ((MqttException) throwable).getCallback().run(); + } + break; + default: + break; } +// if (throwable != null) { +// throwable.printStackTrace(); +// } } } diff --git a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java index 3b053b0718d41ca3ac91eee311f8014845e39654..d69fd428b646c2eee0570a246942751d3b422c7e 100644 --- a/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java +++ b/smart-mqtt-client/src/main/java/org/smartboot/mqtt/client/processor/PublishProcessor.java @@ -61,12 +61,18 @@ public class PublishProcessor implements MqttProcessor { private void processPublishMessage(MqttPublishMessage mqttPublishMessage, MqttClient mqttClient) { MqttPublishVariableHeader header = mqttPublishMessage.getVariableHeader(); - Subscribe subscribe = mqttClient.getSubscribes().get(header.getTopicName()); - - //尝试通配符匹配 + Subscribe subscribe = mqttClient.getMapping().get(header.getTopicName()); if (subscribe == null) { - subscribe = matchWildcardsSubscribe(mqttClient, header.getTopicName()); + subscribe = mqttClient.getSubscribes().get(header.getTopicName()); + //尝试通配符匹配 + if (subscribe == null) { + subscribe = matchWildcardsSubscribe(mqttClient, header.getTopicName()); + } + if (subscribe != null) { + mqttClient.getMapping().put(header.getTopicName(), subscribe); + } } + // If unsubscribed, maybe null. if (subscribe != null && !subscribe.getUnsubscribed()) { subscribe.getConsumer().accept(mqttClient, mqttPublishMessage); diff --git a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java index 9b29430898431e0798e82b8d27055e5cbb39d206..bfa3262589362bd3f558621001ae6ee68163424e 100644 --- a/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java +++ b/smart-mqtt-client/src/test/java/org/smartboot/mqtt/client/Benchmark.java @@ -1,8 +1,13 @@ package org.smartboot.mqtt.client; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.BrokerContext; +import org.smartboot.mqtt.broker.BrokerContextImpl; import org.smartboot.mqtt.common.enums.MqttQoS; import org.smartboot.mqtt.common.util.MqttUtil; @@ -10,6 +15,7 @@ import java.io.IOException; import java.nio.channels.AsynchronousChannelGroup; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -17,18 +23,24 @@ import java.util.concurrent.atomic.AtomicLong; * @version V1.0 , 2022/4/18 */ public class Benchmark { + private static final Logger LOGGER = LoggerFactory.getLogger(Benchmark.class); private final String host = "127.0.0.1"; private final int port = 1883; private AsynchronousChannelGroup channelGroup; + private BrokerContext context; + @Before public void init() throws IOException { channelGroup = AsynchronousChannelGroup.withFixedThreadPool(4, r -> new Thread(r)); + context = new BrokerContextImpl(); + context.init(); } @After public void destroy() { channelGroup.shutdown(); + context.destroy(); } /** @@ -81,12 +93,38 @@ public class Benchmark { System.out.println((System.currentTimeMillis() - s) + ":" + total.get()); } + @Test(timeout = 3000) + public void testSubscribe2() throws InterruptedException { + MqttClient client = new MqttClient(host, port, MqttUtil.createClientId()); + client.connect(channelGroup); + + int connectCount = 10000; + AtomicLong total = new AtomicLong(0); + CountDownLatch countDownLatch = new CountDownLatch(connectCount); + long s = System.currentTimeMillis(); + + while (connectCount-- > 0) { + long start = System.currentTimeMillis(); + String topic = "/topic/" + connectCount; + client.subscribe(topic, MqttQoS.AT_MOST_ONCE, (mqttClient, publishMessage) -> { + }, (mqttClient, mqttQoS) -> { +// LOGGER.info("subscribe:{}", "/topic/" + topic); + total.addAndGet(System.currentTimeMillis() - start); + countDownLatch.countDown(); + }); + + } + countDownLatch.await(); + client.disconnect(); + System.out.println((System.currentTimeMillis() - s) + ":" + total.get()); + } + /** * 订阅topic成功后断开连接 * * @throws InterruptedException */ - @Test + @Test(timeout = 5000) public void testPublish() throws InterruptedException { int connectCount = 100; int publishCount = Short.MAX_VALUE; @@ -117,7 +155,7 @@ public class Benchmark { client.publish(topic, MqttQoS.AT_MOST_ONCE, payload, false); } System.out.println("publish finish!"); - publishDownLatch.await(); - System.out.println("publish use time: " + (System.currentTimeMillis() - startPublish)); + Assert.assertTrue("wait result timeout", publishDownLatch.await(5, TimeUnit.SECONDS)); + System.out.println("publish use time: " + (System.currentTimeMillis() - startPublish) + " count:" + publishDownLatch.getCount()); } } diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml index 806073bedb739ac80dea9befe92aa3f7910c5fd1..2a2c23d2c2580a00d988208b2f9448fd8f9a8c46 100644 --- a/smart-mqtt-common/pom.xml +++ b/smart-mqtt-common/pom.xml @@ -5,7 +5,7 @@ smart-mqtt org.smartboot.mqtt - 0.20 + 0.21 ../pom.xml 4.0.0 diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java index 8c7d78a6158197de8d9f761f10daff318e8d2335..3099892d327feac92320479db63159d5059292e9 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/AbstractSession.java @@ -25,8 +25,7 @@ import org.smartboot.socket.util.Attachment; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Hashtable; /** * @author 三刀(zhengjunweimail@163.com) @@ -54,8 +53,8 @@ public abstract class AbstractSession { private MqttVersion mqttVersion; - private InflightQueue inflightQueue; - private final Map ackMessageCacheMap = new ConcurrentHashMap<>(); + protected InflightQueue inflightQueue; + private final Hashtable ackMessageCacheMap = new Hashtable<>(); public AbstractSession(EventBus eventBus) { this.eventBus = eventBus; @@ -77,7 +76,7 @@ public abstract class AbstractSession { public final synchronized void write(MqttMessage mqttMessage, boolean autoFlush) { try { if (disconnect) { - this.disconnect(); +// this.disconnect(); ValidateUtils.isTrue(false, "已断开连接,无法发送消息"); } mqttMessage.setVersion(mqttVersion); diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java index 168c10d77fbf377fc25fb38152e59822ccfb6b96..4d10632fce23ceb4a597fe3a02b9d9e8b70b3118 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightMessage.java @@ -18,7 +18,7 @@ import org.smartboot.mqtt.common.message.MqttSubscribeMessage; import org.smartboot.mqtt.common.message.MqttUnsubscribeMessage; import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; -import java.util.function.Consumer; +import java.util.concurrent.CompletableFuture; /** * @author 三刀(zhengjunweimail@163.com) @@ -31,6 +31,7 @@ public class InflightMessage { private final MqttPacketIdentifierMessage originalMessage; private MqttPacketIdentifierMessage responseMessage; + private final CompletableFuture> future = new CompletableFuture<>(); /** * 飞行队列为其分配的packetId */ @@ -40,16 +41,13 @@ public class InflightMessage { private boolean commit; - private final Consumer> consumer; - private int retryCount; private long latestTime; - public InflightMessage(int packetId, MqttPacketIdentifierMessage originalMessage, Consumer> consumer) { + public InflightMessage(int packetId, MqttPacketIdentifierMessage originalMessage) { this.assignedPacketId = packetId; this.originalMessage = originalMessage; - this.consumer = consumer; if (originalMessage instanceof MqttSubscribeMessage) { this.expectMessageType = MqttMessageType.SUBACK; } else if (originalMessage instanceof MqttUnsubscribeMessage) { @@ -104,10 +102,6 @@ public class InflightMessage { this.latestTime = latestTime; } - public final Consumer> getConsumer() { - return consumer; - } - public int getAssignedPacketId() { return assignedPacketId; } @@ -120,4 +114,7 @@ public class InflightMessage { this.responseMessage = responseMessage; } + public CompletableFuture> getFuture() { + return future; + } } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index efb4f18992c0d53c7c2e566c80e470338add6859..9859587effad42794871fdf5ccd7b9b51e913b81 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -14,24 +14,28 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; +import org.smartboot.mqtt.common.exception.MqttException; import org.smartboot.mqtt.common.message.MqttFixedHeader; import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage; import org.smartboot.mqtt.common.message.MqttPubRelMessage; import org.smartboot.mqtt.common.message.MqttPublishMessage; +import org.smartboot.mqtt.common.message.MqttSubscribeMessage; import org.smartboot.mqtt.common.message.MqttVariableMessage; import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader; import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader; import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties; +import org.smartboot.mqtt.common.util.MqttAttachKey; import org.smartboot.mqtt.common.util.MqttMessageBuilders; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.util.AttachKey; import org.smartboot.socket.util.Attachment; import org.smartboot.socket.util.QuickTimerTask; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * @author 三刀(zhengjunweimail@163.com) @@ -39,8 +43,8 @@ import java.util.function.Consumer; */ public class InflightQueue { private static final Logger LOGGER = LoggerFactory.getLogger(InflightQueue.class); - static final AttachKey RETRY_TASK_ATTACH_KEY = AttachKey.valueOf("retryTask"); - private static final int TIMEOUT = 3; + static final AttachKey RETRY_TASK_ATTACH_KEY = AttachKey.valueOf(MqttAttachKey.RETRY_TASK); + private static final int TIMEOUT = 30; private final InflightMessage[] queue; private int takeIndex; private int putIndex; @@ -49,7 +53,10 @@ public class InflightQueue { private final AtomicInteger packetId = new AtomicInteger(0); private final AbstractSession session; - private final ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>(); + + private final ReentrantLock lock = new ReentrantLock(false); + + private final Condition notFull = lock.newCondition(); public InflightQueue(AbstractSession session, int size) { ValidateUtils.isTrue(size > 0, "inflight must >0"); @@ -57,40 +64,69 @@ public class InflightQueue { this.session = session; } - public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer) { - return offer(publishBuilder, consumer, null); + public CompletableFuture> put(MqttMessageBuilders.MessageBuilder publishBuilder) { + final ReentrantLock lock = this.lock; + try { + lock.lockInterruptibly(); + while (count == queue.length) { + notFull.await(); + } + return enqueue(publishBuilder); + } catch (Exception e) { + throw new MqttException("put message into inflight queue exception", e); + } finally { + lock.unlock(); + } + } + + public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder) { + return offer(publishBuilder, () -> { + + }); } - public InflightMessage offer(MqttMessageBuilders.MessageBuilder publishBuilder, Consumer> consumer, Runnable runnable) { - InflightMessage inflightMessage; - synchronized (this) { + public CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Runnable runnable) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { if (count == queue.length) { - if (runnable != null) { - runnables.offer(runnable); + int i = putIndex - 1; + if (i < 0) { + i = queue.length - 1; } + queue[i].getFuture().thenAccept(mqttPacketIdentifierMessage -> runnable.run()); return null; + } else { + return enqueue(publishBuilder); } - int id = packetId.incrementAndGet(); - // 16位无符号最大值65535 - if (id > 65535) { - id = id % queue.length + queue.length; - packetId.set(id); - } - MqttPacketIdentifierMessage mqttMessage = publishBuilder.packetId(id).build(); - inflightMessage = new InflightMessage(id, mqttMessage, consumer); - queue[putIndex++] = inflightMessage; - if (putIndex == queue.length) { - putIndex = 0; - } - count++; + } finally { + lock.unlock(); + } + } - //启动消息质量监测 - if (count == 1) { - retry(inflightMessage); - } + + public CompletableFuture> enqueue(MqttMessageBuilders.MessageBuilder publishBuilder) { + + int id = packetId.incrementAndGet(); + // 16位无符号最大值65535 + if (id > 65535) { + id = id % queue.length + queue.length; + packetId.set(id); + } + MqttPacketIdentifierMessage mqttMessage = publishBuilder.packetId(id).build(); + InflightMessage inflightMessage = new InflightMessage(id, mqttMessage); + queue[putIndex++] = inflightMessage; + if (putIndex == queue.length) { + putIndex = 0; + } + count++; + + //启动消息质量监测 + if (count == 1) { + retry(inflightMessage); } session.write(inflightMessage.getOriginalMessage(), count == queue.length); - return inflightMessage; + return inflightMessage.getFuture(); } /** @@ -100,14 +136,14 @@ public class InflightQueue { if (inflightMessage.isCommit() || session.isDisconnect()) { return; } - QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new Runnable() { + QuickTimerTask.SCHEDULED_EXECUTOR_SERVICE.schedule(new AsyncTask() { @Override - public void run() { + public void execute() { if (inflightMessage.isCommit()) { // System.out.println("message has commit,ignore retry monitor"); return; } - if (session.isDisconnect()) { + if (session.session.isInvalid()) { LOGGER.debug("session is disconnect , pause qos monitor."); return; } @@ -137,6 +173,11 @@ public class InflightQueue { MqttPubRelMessage pubRelMessage = new MqttPubRelMessage(MqttFixedHeader.PUB_REL_HEADER_DUP, variableHeader); session.write(pubRelMessage); break; + case SUBACK: + MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) inflightMessage.getOriginalMessage(); + MqttSubscribeMessage dupSubscribeMessage = new MqttSubscribeMessage(MqttFixedHeader.SUBSCRIBE_HEADER_DUP, subscribeMessage.getVariableHeader(), subscribeMessage.getPayload()); + session.write(dupSubscribeMessage); + break; default: throw new UnsupportedOperationException("invalid message type: " + inflightMessage.getExpectMessageType()); } @@ -152,6 +193,10 @@ public class InflightQueue { */ public void notify(MqttPacketIdentifierMessage message) { InflightMessage inflightMessage = queue[(message.getVariableHeader().getPacketId() - 1) % queue.length]; + if (inflightMessage == null) { + LOGGER.info("ignore duplicate message"); + return; + } switch (message.getFixedHeader().getMessageType()) { case SUBACK: case UNSUBACK: @@ -163,7 +208,13 @@ public class InflightQueue { } inflightMessage.setResponseMessage(message); inflightMessage.setLatestTime(System.currentTimeMillis()); - commit(inflightMessage); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + commit(inflightMessage); + } finally { + lock.unlock(); + } break; } case PUBREC: @@ -189,7 +240,7 @@ public class InflightQueue { } } - public synchronized void commit(InflightMessage inflightMessage) { + private void commit(InflightMessage inflightMessage) { MqttVariableMessage originalMessage = inflightMessage.getOriginalMessage(); ValidateUtils.isTrue(originalMessage.getFixedHeader().getQosLevel().value() == 0 || originalMessage.getVariableHeader().getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message"); inflightMessage.setCommit(true); @@ -199,34 +250,28 @@ public class InflightQueue { } queue[takeIndex++] = null; count--; + if (takeIndex == queue.length) { takeIndex = 0; } - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + inflightMessage.getFuture().complete(inflightMessage.getResponseMessage()); while (count > 0 && queue[takeIndex].isCommit()) { inflightMessage = queue[takeIndex]; - inflightMessage.getConsumer().accept(inflightMessage.getResponseMessage()); + inflightMessage.getFuture().complete(inflightMessage.getResponseMessage()); queue[takeIndex++] = null; if (takeIndex == queue.length) { takeIndex = 0; } count--; } - + if (count < queue.length) { + notFull.signal(); + } if (count > 0) { //注册超时监听任务 Attachment attachment = session.session.getAttachment(); InflightMessage monitorMessage = queue[takeIndex]; attachment.put(RETRY_TASK_ATTACH_KEY, () -> session.getInflightQueue().retry(monitorMessage)); } - - while (count < queue.length) { - Runnable runnable = runnables.poll(); - if (runnable != null) { - runnable.run(); - } else { - break; - } - } } } \ No newline at end of file diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MetricTypeEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MetricTypeEnum.java deleted file mode 100644 index 0553ab12543de67420265d8e4ccb1de5675b5a75..0000000000000000000000000000000000000000 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MetricTypeEnum.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.common.enums; - -/** - * @author 三刀(zhengjunweimail@163.com) - * @version V1.0 , 2023/2/19 - */ -public enum MetricTypeEnum { - /** - * 基础指标:指表达业务实体原子量化属性的且不可再分的概念集合 - */ - BASIC, - /** - * 复合指标:指建立在基础指标之上,通过一定运算规则形成的计算指标集合 - */ - COMPOSITE -} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttMetricEnum.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttMetricEnum.java deleted file mode 100644 index b9609df08ad6642eaa5f536a64d113c002279ca4..0000000000000000000000000000000000000000 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/enums/MqttMetricEnum.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.common.enums; - -/** - * @author 三刀(zhengjunweimail@163_com) - * @version V1_0 , 2023/1/26 - */ -public enum MqttMetricEnum { - CLIENT_ONLINE("client_online", "客户端在线数"), - CLIENT_CONNECT("client_connect", "客户端连接次数"), - CLIENT_DISCONNECT("client_disconnected", "客户端断开连接次数"), - CLIENT_SUBSCRIBE("client_subscribe", "订阅次数"), - CLIENT_UNSUBSCRIBE("client_unsubscribe", "取消订阅次数"), - BYTES_RECEIVED("bytes_received", "已接收字节数"), - BYTES_SENT("bytes_sent", "已发送字节数"), - - PACKETS_CONNECT_RECEIVED("packets_connect_received", "接收的 CONNECT 报文数量"), - PACKETS_CONNACK_SENT("packets_connack_sent", "发送的 CONNACK 报文数量"), - - PACKETS_PUBLISH_RECEIVED("packets_publish_received", "接收的 PUBLISH 报文数量"), - PACKETS_PUBLISH_SENT("packets_publish_sent", "发送的 PUBLISH 报文数量"), - - PACKETS_RECEIVED("packets_received", "接收的报文数量"), - PACKETS_SENT("packets_sent", "发送的报文数量"), - - - TOPIC_COUNT("topic_count", "Topic数量"), - - MESSAGE_QOS0_RECEIVED("messages_qos0_received", "接收来自客户端的 QoS 0 消息数量"), - MESSAGE_QOS1_RECEIVED("messages_qos1_received", "接收来自客户端的 QoS 1 消息数量"), - MESSAGE_QOS2_RECEIVED("messages_qos2_received", "接收来自客户端的 QoS 2 消息数量"), - MESSAGE_QOS0_SENT("messages_qos0_sent", "发送给客户端的 QoS 0 消息数量"), - MESSAGE_QOS1_SENT("messages_qos1_sent", "发送给客户端的 QoS 1 消息数量"), - MESSAGE_QOS2_SENT("messages_qos2_sent", "发送给客户端的 QoS 2 消息数量"), - - PERIOD_MESSAGE_RECEIVED("period_message_received", "周期内接收消息数", MetricTypeEnum.COMPOSITE), - - PERIOD_MESSAGE_SENT("period_message_sent", "周期内发送消息数", MetricTypeEnum.COMPOSITE); - - private final String code; - private final String desc; - - private final MetricTypeEnum type; - - MqttMetricEnum(String code, String desc, MetricTypeEnum type) { - this.code = code; - this.desc = desc; - this.type = type; - } - - MqttMetricEnum(String code, String desc) { - this(code, desc, MetricTypeEnum.BASIC); - } - - public String getCode() { - return code; - } - - public String getDesc() { - return desc; - } - - public MetricTypeEnum getType() { - return type; - } -} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java index 47769341fc5a9cb71b2ebf377176a37eab0b5c9b..07cffa571022360c6fb29fc50ad3e01f23456848 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/protocol/MqttProtocol.java @@ -16,6 +16,7 @@ import org.smartboot.mqtt.common.enums.MqttMessageType; import org.smartboot.mqtt.common.enums.MqttVersion; import org.smartboot.mqtt.common.message.MqttFixedHeader; import org.smartboot.mqtt.common.message.MqttMessage; +import org.smartboot.mqtt.common.util.MqttAttachKey; import org.smartboot.mqtt.common.util.ValidateUtils; import org.smartboot.socket.Protocol; import org.smartboot.socket.transport.AioSession; @@ -36,8 +37,8 @@ public class MqttProtocol implements Protocol { private static final Logger logger = LoggerFactory.getLogger(MqttProtocol.class); private final int maxBytesInMessage; - public static final AttachKey MQTT_VERSION_ATTACH_KEY = AttachKey.valueOf("mqtt_version"); - private static final AttachKey DECODE_UNIT_ATTACH_KEY = AttachKey.valueOf("decodeUnit"); + public static final AttachKey MQTT_VERSION_ATTACH_KEY = AttachKey.valueOf(MqttAttachKey.MQTT_VERSION); + private static final AttachKey DECODE_UNIT_ATTACH_KEY = AttachKey.valueOf(MqttAttachKey.DECODE_UNIT); public MqttProtocol(int maxBytesInMessage) { diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java deleted file mode 100644 index 9b031cfb45d44f1c5a100f6835852a6fd4a58a6f..0000000000000000000000000000000000000000 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricItemTO.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.common.to; - -import com.alibaba.fastjson2.annotation.JSONField; -import org.smartboot.mqtt.common.enums.MqttMetricEnum; - -import java.util.Date; -import java.util.concurrent.atomic.LongAdder; - -/** - * @author 三刀(zhengjunweimail@163.com) - * @version V1.0 , 2023/1/26 - */ -public class MetricItemTO { - /** - * 指标编码 - */ - private String code; - /** - * 指标描述 - */ - private String desc; - - /** - * 指标数据 - */ - @JSONField(serialize = false) - private final LongAdder metric = new LongAdder(); - - /** - * 采集周期,单位:秒,非正整数表示禁用周期统计 - */ - private final int period; - /** - * 未启用周期采集改值为null - */ - @JSONField(format = "yyyy-MM-dd HH:mm:ss") - private Date time; - - - public MetricItemTO() { - this.period = 0; - } - - public MetricItemTO(MqttMetricEnum metricEnum) { - this(metricEnum, 0); - } - - public MetricItemTO(MqttMetricEnum metricEnum, int period) { - this.code = metricEnum.getCode(); - this.desc = metricEnum.getDesc(); - this.period = period; - } - - public String getCode() { - return code; - } - - public void setCode(String code) { - this.code = code; - } - - public String getDesc() { - return desc; - } - - public void setDesc(String desc) { - this.desc = desc; - } - - public int getValue() { - return metric.intValue(); - } - - public void setValue(int value) { - metric.reset(); - metric.add(value); - } - - public LongAdder getMetric() { - return metric; - } - - public int getPeriod() { - return period; - } - - public Date getTime() { - return time; - } - - public void setTime(Date time) { - this.time = time; - } -} diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricTO.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttAttachKey.java similarity index 42% rename from smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricTO.java rename to smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttAttachKey.java index 17b44d0562847859b62d31eed86260dbd86bcc3b..68142ccd896f13e44a13538ae53ac94bd1dfebdd 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/to/MetricTO.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/util/MqttAttachKey.java @@ -8,33 +8,27 @@ * without special permission from the smartboot organization. */ -package org.smartboot.mqtt.common.to; +package org.smartboot.mqtt.common.util; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.smartboot.socket.util.AttachKey; /** * @author 三刀(zhengjunweimail@163.com) - * @version V1.0 , 2023/1/22 + * @version V1.0 , 5/27/23 */ -public class MetricTO { - /** - * 指标项 - */ - private final Map metric = new HashMap<>(); +public class MqttAttachKey { + public static final String MQTT_VERSION = "mqtt_version"; - /** - * 指标分组 - */ - private final Map> group = new HashMap<>(); + public static final String DECODE_UNIT = "decode_unit"; + public static final String RETRY_TASK = "retry_task"; + public static final String MQTT_SESSION = "mqtt_session"; - - public Map> getGroup() { - return group; - } - - public Map getMetric() { - return metric; + static { + AttachKey.reset(); + AttachKey.valueOf(MQTT_SESSION); + AttachKey.valueOf(MQTT_VERSION); + AttachKey.valueOf(DECODE_UNIT); + AttachKey.valueOf(RETRY_TASK); + AttachKey.reset(); } }